Compare commits

...

66 Commits

Author SHA1 Message Date
Cédric Verstraeten
e8ca776e4e Merge pull request #236 from kerberos-io/fix/debugging-lost-keyframes
fix/debugging-lost-keyframes
2026-02-11 16:51:07 +01:00
Cédric Verstraeten
de5c4b6e0a Merge branch 'master' into fix/debugging-lost-keyframes 2026-02-11 16:48:08 +01:00
Cédric Verstraeten
9ba64de090 add additional logging 2026-02-11 16:48:01 +01:00
Cédric Verstraeten
7ceeebe76e Merge pull request #235 from kerberos-io/fix/debugging-lost-keyframes
fix/debugging-lost-keyframes
2026-02-11 16:15:57 +01:00
Cédric Verstraeten
bd7dbcfcf2 Enhance FPS tracking and logging for keyframes in gortsplib and mp4 modules 2026-02-11 15:11:52 +00:00
Cédric Verstraeten
8c7a46e3ae Merge pull request #234 from kerberos-io/fix/fps-gop-size
fix/fps-gop-size
2026-02-11 15:05:31 +01:00
Cédric Verstraeten
57ccfaabf5 Merge branch 'fix/fps-gop-size' of github.com:kerberos-io/agent into fix/fps-gop-size 2026-02-11 14:59:34 +01:00
Cédric Verstraeten
4a9cb51e95 Update machinery/src/capture/gortsplib.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-11 14:59:15 +01:00
Cédric Verstraeten
ab6f621e76 Merge branch 'fix/fps-gop-size' of github.com:kerberos-io/agent into fix/fps-gop-size 2026-02-11 14:58:44 +01:00
Cédric Verstraeten
c365ae5af2 Ensure thread-safe closure of peer connections in InitializeWebRTCConnection 2026-02-11 13:58:29 +00:00
Cédric Verstraeten
b05c3d1baa Update machinery/src/capture/gortsplib.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-11 14:52:40 +01:00
Cédric Verstraeten
c7c7203fad Merge branch 'master' into fix/fps-gop-size 2026-02-11 14:48:05 +01:00
Cédric Verstraeten
d93f85b4f3 Refactor FPS calculation to use per-stream trackers for improved accuracy 2026-02-11 13:45:07 +00:00
Cédric Verstraeten
031212b98c Merge pull request #232 from kerberos-io/fix/fps-gop-size
fix/fps-gop-size
2026-02-11 14:27:18 +01:00
Cédric Verstraeten
a4837b3cb3 Implement PTS-based FPS calculation and GOP size adjustments 2026-02-11 13:14:29 +00:00
Cédric Verstraeten
77629ac9b8 Merge pull request #231 from kerberos-io/feature/improve-keyframe-interval
feature/improve-keyframe-interval
2026-02-11 12:28:33 +01:00
cedricve
59608394af Use Warning instead of Warn in mp4.go
Replace call to log.Log.Warn with log.Log.Warning in MP4.flushPendingVideoSample to match the logger API. This is a non-functional change that preserves the original message and behavior while using the correct logging method name.
2026-02-11 12:26:18 +01:00
cedricve
9dfcaa466f Refactor video sample flushing logic into a dedicated function 2026-02-11 11:48:15 +01:00
cedricve
88442e4525 Add pending video sample to segment before flush
Before flushing a segment when mp4.Start is true, add any pending VideoFullSample for the current video track to the current fragment. The change computes and updates LastVideoSampleDTS and VideoTotalDuration, adjusts the sample DecodeTime and Dur, calls AddFullSampleToTrack, logs errors, and clears VideoFullSample so the pending sample is included in the segment before starting a new one. This ensures segments contain all frames up to (but not including) the keyframe that triggered the flush.
2026-02-11 11:38:51 +01:00
Cédric Verstraeten
891ae2e5d5 Merge pull request #230 from kerberos-io/feature/improve-video-format
feature/improve-video-format
2026-02-10 17:25:23 +01:00
Cédric Verstraeten
32b471f570 Update machinery/src/video/mp4.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-10 17:20:40 +01:00
Cédric Verstraeten
5d745fc989 Update machinery/src/video/mp4.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-10 17:20:29 +01:00
Cédric Verstraeten
edfa6ec4c6 Update machinery/src/video/mp4.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-10 17:20:16 +01:00
Cédric Verstraeten
0c460efea6 Refactor PR description workflow to include organization variable and correct pull request URL format 2026-02-10 16:17:10 +00:00
Cédric Verstraeten
96df049e59 Enhance MP4 initialization by adding max recording duration parameter, improving placeholder size calculation for segments. 2026-02-10 15:59:59 +00:00
Cédric Verstraeten
2cb454e618 Merge branch 'master' into feature/improve-video-format 2026-02-10 16:57:47 +01:00
Cédric Verstraeten
7f2ebb655e Fix sidx.FirstOffset calculation and re-encode init segment for accurate MP4 structure 2026-02-10 15:56:10 +00:00
Cédric Verstraeten
63857fb5cc Merge pull request #229 from kerberos-io/feature/improve-video-format
feature/improve-video-format
2026-02-10 16:53:34 +01:00
Cédric Verstraeten
f4c75f9aa9 Add environment variables for PR number and project name in workflow 2026-02-10 15:31:37 +00:00
Cédric Verstraeten
c3936dc884 Enhance MP4 segment handling by adding segment durations and base decode times, improving fragment management and data integrity 2026-02-10 14:47:47 +00:00
Cédric Verstraeten
2868ddc499 Add fragment duration handling and improve MP4 segment management 2026-02-10 13:52:58 +00:00
Cédric Verstraeten
176610a694 Update mp4.go 2026-02-10 13:39:55 +01:00
Cédric Verstraeten
f60aff4fd6 Enhance MP4 closing process by adding final video and audio samples, ensuring data integrity and updating track metadata 2026-02-10 12:45:46 +01:00
Cédric Verstraeten
847f62303a Merge pull request #228 from kerberos-io/feature/improve-webrtc-tracing
feature/improve-webrtc-tracing
2026-01-23 15:22:45 +01:00
Cédric Verstraeten
f174e2697e Enhance WebRTC handling with connection management and error logging improvements 2026-01-23 14:16:55 +00:00
Cédric Verstraeten
acac2d5d42 Refactor main function to improve code structure and readability 2026-01-23 13:48:24 +00:00
Cédric Verstraeten
f304c2ed3e Merge pull request #219 from kerberos-io/fix/release-process
fix/release-process
2025-09-17 16:32:58 +02:00
cedricve
2003a38cdc Add release creation workflow with multi-arch Docker builds and artifact handling 2025-09-17 14:32:06 +00:00
Cédric Verstraeten
a67c5a1f39 Merge pull request #216 from kerberos-io/feature/upgrade-build-process-avoid-base
feature/upgrade-build-process-avoid-base
2025-09-11 16:22:53 +02:00
Cédric Verstraeten
b7a87f95e5 Update Docker workflow to use Ubuntu 24.04 and simplify build steps for multi-arch images 2025-09-11 15:00:37 +02:00
Cédric Verstraeten
0aa0b8ad8f Refactor build steps in PR workflow to streamline Docker operations and improve artifact handling 2025-09-11 14:09:22 +02:00
Cédric Verstraeten
2bff868de6 Update upload artifact action to v4 in PR build workflow 2025-09-11 13:45:34 +02:00
Cédric Verstraeten
8b59828126 Add steps to strip binary and upload artifact in PR build workflow 2025-09-11 13:39:27 +02:00
Cédric Verstraeten
f55e25db07 Remove Golang build steps from Dockerfiles for amd64 and arm64 2025-09-11 10:29:05 +02:00
Cédric Verstraeten
243c969666 Add missing go version check in Dockerfile build step 2025-09-11 10:26:54 +02:00
Cédric Verstraeten
ec7f2e0303 Update ARM64 build step to specify Dockerfile for architecture 2025-09-11 10:18:19 +02:00
Cédric Verstraeten
a4a032d994 Update GitHub Actions workflow and Dockerfiles for architecture support and dependency management 2025-09-11 10:17:51 +02:00
Cédric Verstraeten
0a84744e49 Remove arm-v6 architecture from build matrix in PR workflow 2025-09-09 14:38:51 +00:00
Cédric Verstraeten
1425430376 Update .gitignore to include __debug* and change Dockerfile base image to golang:1.24.5-bullseye 2025-09-09 14:36:32 +00:00
Cédric Verstraeten
ca8d88ffce Update GitHub Actions workflow to support multiple architectures in build matrix 2025-09-09 14:34:39 +00:00
Cédric Verstraeten
af3f8bb639 Add GitHub Actions workflow for pull request builds and update Dockerfile dependencies 2025-09-09 16:28:19 +02:00
Cédric Verstraeten
1f9772d472 Merge pull request #212 from kerberos-io/fix/ovrride-base-width
fix/ovrride-base-width
2025-08-12 07:05:43 +02:00
cedricve
94cf361b55 Reset baseWidth and baseHeight in StoreConfig function 2025-08-12 04:47:50 +00:00
cedricve
6acdf258e7 Fix typo in environment variable override function name 2025-08-11 21:10:33 +00:00
cedricve
cc0a810ab3 Handle both baseWidth and baseHeight in IPCamera config
Adds logic to set IPCamera BaseWidth and BaseHeight when both values are provided, instead of only calculating aspect ratio. Also fixes a typo in the function call to override configuration with environment variables.
2025-08-11 23:06:24 +02:00
Cédric Verstraeten
c19bfbe552 Merge pull request #211 from kerberos-io/feature/minimize-sd-view-image
feature/minimize-sd-view-image
2025-08-11 12:30:01 +02:00
Cédric Verstraeten
39aaf5ad6c Merge branch 'feature/minimize-sd-view-image' of github.com:kerberos-io/agent into feature/minimize-sd-view-image 2025-08-11 10:25:31 +00:00
Cédric Verstraeten
6fba2ff05d Refactor logging in gortsplib and mp4 modules to use Debug and Error levels; update free box size in MP4 initialization 2025-08-11 10:20:37 +00:00
Cédric Verstraeten
d78e682759 Update config.json 2025-08-11 11:39:45 +02:00
Cédric Verstraeten
ed582a9d57 Resize polygon coordinates based on IPCamera BaseWidth and BaseHeight configuration 2025-08-11 09:38:24 +00:00
Cédric Verstraeten
aa925d5c9b Add BaseWidth and BaseHeight configuration options for IPCamera; update resizing logic in RunAgent and websocket handlers 2025-08-11 09:23:11 +00:00
Cédric Verstraeten
08d191e542 Update image resizing to support dynamic height; modify related functions and configurations 2025-08-11 08:08:39 +00:00
Cédric Verstraeten
cc075d7237 Refactor IPCamera configuration to include BaseWidth and BaseHeight; update image resizing logic to use dynamic width based on configuration 2025-08-06 14:42:23 +00:00
Cédric Verstraeten
1974bddfbe Merge pull request #210 from kerberos-io/feature/minimize-sd-view-image
feature/minimize-sd-view-image
2025-07-30 15:42:06 +02:00
Cédric Verstraeten
12cb88e1c1 Replace fmt.Println with log.Log.Debug for buffer size in ImageToBytes function 2025-07-30 13:34:14 +00:00
Cédric Verstraeten
c054526998 Add image resizing functionality and update dependencies
- Introduced ResizeImage function to resize images before encoding.
- Updated ImageToBytes function to accept pointer to image.
- Added nfnt/resize library for image resizing.
- Updated go.mod and go.sum to include new dependencies.
- Updated image processing in HandleLiveStreamSD, GetSnapshotRaw, and other functions to use resized images.
- Updated yarn.lock for ui package version change.
2025-07-30 12:06:12 +00:00
26 changed files with 1480 additions and 733 deletions

View File

@@ -1,120 +0,0 @@
name: Create a new release
on:
release:
types: [created]
workflow_dispatch:
inputs:
tag:
description: "Tag for the Docker image"
required: true
default: "test"
env:
REPO: kerberos/agent
jobs:
build-amd64:
runs-on: ubuntu-latest
permissions:
contents: write
strategy:
matrix:
architecture: [amd64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: docker buildx build --platform linux/$(echo ${{matrix.architecture}} | tr - /) -t $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}} --push .
- name: Create new and append to manifest
run: docker buildx imagetools create -t $REPO:${{ github.event.inputs.tag || github.ref_name }} $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
- name: Create new and append to manifest latest
run: docker buildx imagetools create -t $REPO:latest $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
if: github.event.inputs.tag == 'test'
- name: Run Buildx with output
run: docker buildx build --platform linux/$(echo ${{matrix.architecture}} | tr - /) -t $REPO-arch:arch-$(echo ${{matrix.architecture}} | tr / -)-${{github.event.inputs.tag || github.ref_name}} --output type=tar,dest=output-${{matrix.architecture}}.tar .
- name: Strip binary
run: mkdir -p output/ && tar -xf output-${{matrix.architecture}}.tar -C output && rm output-${{matrix.architecture}}.tar && cd output/ && tar -cf ../agent-${{matrix.architecture}}.tar -C home/agent . && rm -rf output
- name: Create a release
uses: ncipollo/release-action@v1
with:
latest: true
allowUpdates: true
name: ${{ github.event.inputs.tag || github.ref_name }}
tag: ${{ github.event.inputs.tag || github.ref_name }}
generateReleaseNotes: false
omitBodyDuringUpdate: true
artifacts: "agent-${{matrix.architecture}}.tar"
# Taken from GoReleaser's own release workflow.
# The available Snapcraft Action has some bugs described in the issue below.
# The mkdirs are a hack for https://github.com/goreleaser/goreleaser/issues/1715.
#- name: Setup Snapcraft
# run: |
# sudo apt-get update
# sudo apt-get -yq --no-install-suggests --no-install-recommends install snapcraft
# mkdir -p $HOME/.cache/snapcraft/download
# mkdir -p $HOME/.cache/snapcraft/stage-packages
#- name: Use Snapcraft
# run: tar -xf agent-${{matrix.architecture}}.tar && snapcraft
build-other:
runs-on: ubuntu-latest
permissions:
contents: write
needs: build-amd64
strategy:
matrix:
architecture: [arm64, arm-v7, arm-v6]
#architecture: [arm64, arm-v7]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: docker buildx build --platform linux/$(echo ${{matrix.architecture}} | tr - /) -t $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}} --push .
- name: Create new and append to manifest
run: docker buildx imagetools create --append -t $REPO:${{ github.event.inputs.tag || github.ref_name }} $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
- name: Create new and append to manifest latest
run: docker buildx imagetools create --append -t $REPO:latest $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
if: github.event.inputs.tag == 'test'
- name: Run Buildx with output
run: docker buildx build --platform linux/$(echo ${{matrix.architecture}} | tr - /) -t $REPO-arch:arch-$(echo ${{matrix.architecture}} | tr / -)-${{github.event.inputs.tag || github.ref_name}} --output type=tar,dest=output-${{matrix.architecture}}.tar .
- name: Strip binary
run: mkdir -p output/ && tar -xf output-${{matrix.architecture}}.tar -C output && rm output-${{matrix.architecture}}.tar && cd output/ && tar -cf ../agent-${{matrix.architecture}}.tar -C home/agent . && rm -rf output
- name: Create a release
uses: ncipollo/release-action@v1
with:
latest: true
allowUpdates: true
name: ${{ github.event.inputs.tag || github.ref_name }}
tag: ${{ github.event.inputs.tag || github.ref_name }}
generateReleaseNotes: false
omitBodyDuringUpdate: true
artifacts: "agent-${{matrix.architecture}}.tar"

75
.github/workflows/pr-build.yml vendored Normal file
View File

@@ -0,0 +1,75 @@
name: Build pull request
on:
pull_request:
types: [opened, synchronize]
env:
REPO: kerberos/agent
jobs:
build-amd64:
runs-on: ubuntu-24.04
permissions:
contents: write
strategy:
matrix:
architecture: [amd64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar
build-arm64:
runs-on: ubuntu-24.04-arm
permissions:
contents: write
strategy:
matrix:
architecture: [arm64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} -f Dockerfile.arm64 .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar

View File

@@ -2,6 +2,11 @@ name: Autofill PR description
on: pull_request
env:
ORGANIZATION: uugai
PROJECT: ${{ github.event.repository.name }}
PR_NUMBER: ${{ github.event.number }}
jobs:
openai-pr-description:
runs-on: ubuntu-22.04
@@ -16,4 +21,6 @@ jobs:
azure_openai_api_key: ${{ secrets.AZURE_OPENAI_API_KEY }}
azure_openai_endpoint: ${{ secrets.AZURE_OPENAI_ENDPOINT }}
azure_openai_version: ${{ secrets.AZURE_OPENAI_VERSION }}
openai_model: ${{ secrets.OPENAI_MODEL }}
pull_request_url: https://pr${{ env.PR_NUMBER }}.api.kerberos.lol
overwrite_description: true

130
.github/workflows/release-create.yml vendored Normal file
View File

@@ -0,0 +1,130 @@
name: Create a new release
on:
release:
types: [created]
workflow_dispatch:
inputs:
tag:
description: "Tag for the Docker image"
required: true
default: "test"
env:
REPO: kerberos/agent
jobs:
build-amd64:
runs-on: ubuntu-24.04
permissions:
contents: write
strategy:
matrix:
architecture: [amd64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Build and push Docker image
run: |
docker tag ${{matrix.architecture}} $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
docker push $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar
build-arm64:
runs-on: ubuntu-24.04-arm
permissions:
contents: write
strategy:
matrix:
architecture: [arm64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} -f Dockerfile.arm64 .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Build and push Docker image
run: |
docker tag ${{matrix.architecture}} $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
docker push $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar
create-manifest:
runs-on: ubuntu-24.04
needs: [build-amd64, build-arm64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Create and push multi-arch manifest
run: |
docker manifest create $REPO:${{ github.event.inputs.tag || github.ref_name }} \
$REPO-arch:arch-amd64-${{github.event.inputs.tag || github.ref_name}} \
$REPO-arch:arch-arm64-${{github.event.inputs.tag || github.ref_name}}
docker manifest push $REPO:${{ github.event.inputs.tag || github.ref_name }}
- name: Create and push latest manifest
run: |
docker manifest create $REPO:latest \
$REPO-arch:arch-amd64-${{github.event.inputs.tag || github.ref_name}} \
$REPO-arch:arch-arm64-${{github.event.inputs.tag || github.ref_name}}
docker manifest push $REPO:latest
if: github.event.inputs.tag == 'test'
create-release:
runs-on: ubuntu-24.04
needs: [build-amd64, build-arm64]
permissions:
contents: write
steps:
- name: Download all artifacts
uses: actions/download-artifact@v4
- name: Create a release
uses: ncipollo/release-action@v1
with:
latest: true
allowUpdates: true
name: ${{ github.event.inputs.tag || github.ref_name }}
tag: ${{ github.event.inputs.tag || github.ref_name }}
generateReleaseNotes: false
omitBodyDuringUpdate: true
artifacts: "agent-*.tar/agent-*.tar"

2
.gitignore vendored
View File

@@ -1,6 +1,8 @@
ui/node_modules
ui/build
ui/public/assets/env.js
.DS_Store
__debug*
.idea
machinery/www
yarn.lock

View File

@@ -1,7 +1,7 @@
ARG BASE_IMAGE_VERSION=70ec57e
ARG BASE_IMAGE_VERSION=amd64-ddbe40e
FROM kerberos/base:${BASE_IMAGE_VERSION} AS build-machinery
LABEL AUTHOR=Kerberos.io
LABEL AUTHOR=uug.ai
ENV GOROOT=/usr/local/go
ENV GOPATH=/go

138
Dockerfile.arm64 Normal file
View File

@@ -0,0 +1,138 @@
ARG BASE_IMAGE_VERSION=arm64-ddbe40e
FROM kerberos/base:${BASE_IMAGE_VERSION} AS build-machinery
LABEL AUTHOR=uug.ai
ENV GOROOT=/usr/local/go
ENV GOPATH=/go
ENV PATH=$GOPATH/bin:$GOROOT/bin:/usr/local/lib:$PATH
ENV GOSUMDB=off
##########################################
# Installing some additional dependencies.
RUN apt-get upgrade -y && apt-get update && apt-get install -y --fix-missing --no-install-recommends \
git build-essential cmake pkg-config unzip libgtk2.0-dev \
curl ca-certificates libcurl4-openssl-dev libssl-dev libjpeg62-turbo-dev && \
rm -rf /var/lib/apt/lists/*
##############################################################################
# Copy all the relevant source code in the Docker image, so we can build this.
RUN mkdir -p /go/src/github.com/kerberos-io/agent
COPY machinery /go/src/github.com/kerberos-io/agent/machinery
RUN rm -rf /go/src/github.com/kerberos-io/agent/machinery/.env
##################################################################
# Get the latest commit hash, so we know which version we're running
COPY .git /go/src/github.com/kerberos-io/agent/.git
RUN cd /go/src/github.com/kerberos-io/agent/.git && git log --format="%H" -n 1 | head -c7 > /go/src/github.com/kerberos-io/agent/machinery/version
RUN cat /go/src/github.com/kerberos-io/agent/machinery/version
##################
# Build Machinery
RUN cd /go/src/github.com/kerberos-io/agent/machinery && \
go mod download && \
go build -tags timetzdata,netgo,osusergo --ldflags '-s -w -extldflags "-static -latomic"' main.go && \
mkdir -p /agent && \
mv main /agent && \
mv version /agent && \
mv data /agent && \
mkdir -p /agent/data/cloud && \
mkdir -p /agent/data/snapshots && \
mkdir -p /agent/data/log && \
mkdir -p /agent/data/recordings && \
mkdir -p /agent/data/capture-test && \
mkdir -p /agent/data/config
####################################
# Let's create a /dist folder containing just the files necessary for runtime.
# Later, it will be copied as the / (root) of the output image.
WORKDIR /dist
RUN cp -r /agent ./
####################################################################################
# This will collect dependent libraries so they're later copied to the final image.
RUN /dist/agent/main version
FROM node:18.14.0-alpine3.16 AS build-ui
RUN apk update && apk upgrade --available && sync
########################
# Build Web (React app)
RUN mkdir -p /go/src/github.com/kerberos-io/agent/machinery/www
COPY ui /go/src/github.com/kerberos-io/agent/ui
RUN cd /go/src/github.com/kerberos-io/agent/ui && rm -rf yarn.lock && yarn config set network-timeout 300000 && \
yarn && yarn build
####################################
# Let's create a /dist folder containing just the files necessary for runtime.
# Later, it will be copied as the / (root) of the output image.
WORKDIR /dist
RUN mkdir -p ./agent && cp -r /go/src/github.com/kerberos-io/agent/machinery/www ./agent/
############################################
# Publish main binary to GitHub release
FROM alpine:latest
############################
# Protect by non-root user.
RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent video
#################################
# Copy files from previous images
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent
RUN mv /agent/* /home/agent/
RUN /home/agent/main version
#######################
# Make template config
RUN cp /home/agent/data/config/config.json /home/agent/data/config.template.json
###########################
# Set permissions correctly
RUN chown -R agent:kerberosio /home/agent/data
RUN chown -R agent:kerberosio /home/agent/www
###########################
# Grant the necessary root capabilities to the process trying to bind to the privileged port
RUN apk add libcap && setcap 'cap_net_bind_service=+ep' /home/agent/main
###################
# Run non-root user
USER agent
######################################
# By default the app runs on port 80
EXPOSE 80
######################################
# Check if agent is still running
HEALTHCHECK CMD curl --fail http://localhost:80 || exit 1
###################################################
# Leeeeettttt'ssss goooooo!!!
# Run the shizzle from the right working directory.
WORKDIR /home/agent
CMD ["./main", "-action", "run", "-port", "80"]

View File

@@ -208,6 +208,8 @@ Next to attaching the configuration file, it is also possible to override the co
| `AGENT_REGION_POLYGON` | A single polygon set for motion detection: "x1,y1;x2,y2;x3,y3;... | "" |
| `AGENT_CAPTURE_IPCAMERA_RTSP` | Full-HD RTSP endpoint to the camera you're targetting. | "" |
| `AGENT_CAPTURE_IPCAMERA_SUB_RTSP` | Sub-stream RTSP endpoint used for livestreaming (WebRTC). | "" |
| `AGENT_CAPTURE_IPCAMERA_BASE_WIDTH` | Force a specific width resolution for live view processing. | "" |
| `AGENT_CAPTURE_IPCAMERA_BASE_HEIGHT` | Force a specific height resolution for live view processing. | "" |
| `AGENT_CAPTURE_IPCAMERA_ONVIF` | Mark as a compliant ONVIF device. | "" |
| `AGENT_CAPTURE_IPCAMERA_ONVIF_XADDR` | ONVIF endpoint/address running on the camera. | "" |
| `AGENT_CAPTURE_IPCAMERA_ONVIF_USERNAME` | ONVIF username to authenticate against. | "" |

View File

@@ -14,7 +14,9 @@
"ipcamera": {
"rtsp": "",
"sub_rtsp": "",
"fps": ""
"fps": "",
"base_width": 640,
"base_height": 0
},
"usbcamera": {
"device": ""
@@ -120,4 +122,4 @@
"signing": {},
"realtimeprocessing": "false",
"realtimeprocessing_topic": ""
}
}

View File

@@ -26,6 +26,7 @@ require (
github.com/kerberos-io/joy4 v1.0.64
github.com/kerberos-io/onvif v1.0.0
github.com/minio/minio-go/v6 v6.0.57
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/pion/interceptor v0.1.40
github.com/pion/rtp v1.8.19
@@ -41,6 +42,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0
go.opentelemetry.io/otel/sdk v1.36.0
go.opentelemetry.io/otel/trace v1.36.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
@@ -118,7 +120,6 @@ require (
github.com/ziutek/mymysql v1.5.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
golang.org/x/arch v0.16.0 // indirect
golang.org/x/crypto v0.38.0 // indirect

View File

@@ -847,6 +847,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=

View File

@@ -101,31 +101,35 @@ func main() {
switch action {
case "version":
log.Log.Info("main.Main(): You are currrently running Kerberos Agent " + VERSION)
{
log.Log.Info("main.Main(): You are currrently running Kerberos Agent " + VERSION)
}
case "discover":
// Convert duration to int
timeout, err := time.ParseDuration(timeout + "ms")
if err != nil {
log.Log.Fatal("main.Main(): could not parse timeout: " + err.Error())
return
{
// Convert duration to int
timeout, err := time.ParseDuration(timeout + "ms")
if err != nil {
log.Log.Fatal("main.Main(): could not parse timeout: " + err.Error())
return
}
onvif.Discover(timeout)
}
onvif.Discover(timeout)
case "decrypt":
log.Log.Info("main.Main(): Decrypting: " + flag.Arg(0) + " with key: " + flag.Arg(1))
symmetricKey := []byte(flag.Arg(1))
{
log.Log.Info("main.Main(): Decrypting: " + flag.Arg(0) + " with key: " + flag.Arg(1))
symmetricKey := []byte(flag.Arg(1))
if symmetricKey == nil || len(symmetricKey) == 0 {
log.Log.Fatal("main.Main(): symmetric key should not be empty")
return
}
if len(symmetricKey) != 32 {
log.Log.Fatal("main.Main(): symmetric key should be 32 bytes")
return
}
if len(symmetricKey) == 0 {
log.Log.Fatal("main.Main(): symmetric key should not be empty")
return
}
if len(symmetricKey) != 32 {
log.Log.Fatal("main.Main(): symmetric key should be 32 bytes")
return
}
utils.Decrypt(flag.Arg(0), symmetricKey)
utils.Decrypt(flag.Arg(0), symmetricKey)
}
case "run":
{
@@ -213,6 +217,8 @@ func main() {
routers.StartWebserver(configDirectory, &configuration, &communication, &capture)
}
default:
log.Log.Error("main.Main(): Sorry I don't understand :(")
{
log.Log.Error("main.Main(): Sorry I don't understand :(")
}
}
}

View File

@@ -85,12 +85,8 @@ type Golibrtsp struct {
Streams []packets.Stream
// FPS calculation fields
lastFrameTime time.Time
frameTimeBuffer []time.Duration
frameBufferSize int
frameBufferIndex int
fpsMutex sync.Mutex
// Per-stream FPS calculation (keyed by stream index)
fpsTrackers map[int8]*fpsTracker
// I-frame interval tracking fields
packetsSinceLastKeyframe int
@@ -101,6 +97,78 @@ type Golibrtsp struct {
keyframeMutex sync.Mutex
}
// fpsTracker holds per-stream state for PTS-based FPS calculation.
// Each video stream (H264 / H265) gets its own tracker so PTS
// samples from different codecs never interleave.
type fpsTracker struct {
mu sync.Mutex
lastPTS time.Duration
hasPTS bool
frameTimeBuffer []time.Duration
bufferSize int
bufferIndex int
cachedFPS float64 // latest computed FPS
}
func newFPSTracker(bufferSize int) *fpsTracker {
return &fpsTracker{
frameTimeBuffer: make([]time.Duration, bufferSize),
bufferSize: bufferSize,
}
}
// update records a new PTS sample and returns the latest FPS estimate.
// It must be called once per complete decoded frame (after Decode()
// succeeds), not on every RTP packet fragment.
func (ft *fpsTracker) update(pts time.Duration) float64 {
ft.mu.Lock()
defer ft.mu.Unlock()
if !ft.hasPTS {
ft.lastPTS = pts
ft.hasPTS = true
return 0
}
interval := pts - ft.lastPTS
ft.lastPTS = pts
// Skip invalid intervals (zero, negative, or very large which
// indicate a PTS discontinuity or wrap).
if interval <= 0 || interval > 5*time.Second {
return ft.cachedFPS
}
ft.frameTimeBuffer[ft.bufferIndex] = interval
ft.bufferIndex = (ft.bufferIndex + 1) % ft.bufferSize
var totalInterval time.Duration
validSamples := 0
for _, iv := range ft.frameTimeBuffer {
if iv > 0 {
totalInterval += iv
validSamples++
}
}
if validSamples == 0 {
return ft.cachedFPS
}
avgInterval := totalInterval / time.Duration(validSamples)
if avgInterval == 0 {
return ft.cachedFPS
}
ft.cachedFPS = float64(time.Second) / float64(avgInterval)
return ft.cachedFPS
}
// fps returns the most recent FPS estimate without recording a new sample.
func (ft *fpsTracker) fps() float64 {
ft.mu.Lock()
defer ft.mu.Unlock()
return ft.cachedFPS
}
// Init function
var H264FrameDecoder *Decoder
var H265FrameDecoder *Decoder
@@ -548,18 +616,17 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
if len(rtppkt.Payload) > 0 {
// decode timestamp
pts, ok := g.Client.PacketPTS(g.VideoH264Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.VideoH264Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
// decode timestamps — validate each call separately
pts, okPTS := g.Client.PacketPTS(g.VideoH264Media, rtppkt)
pts2, okPTS2 := g.Client.PacketPTS2(g.VideoH264Media, rtppkt)
if !okPTS2 {
log.Log.Debug("capture.golibrtsp.Start(): unable to get PTS2 from PacketPTS2")
return
}
// Extract access units from RTP packets
// We need to do this, because the decoder expects a full
// access unit. Once we have a full access unit, we can
// decode it, and know if it's a keyframe or not.
// Extract access units from RTP packets.
// We need a complete access unit to determine whether
// this is a keyframe.
au, errDecode := g.VideoH264Decoder.Decode(rtppkt)
if errDecode != nil {
if errDecode != rtph264.ErrNonStartingPacketAndNoPrevious && errDecode != rtph264.ErrMorePacketsNeeded {
@@ -568,6 +635,18 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
return
}
// Frame is complete — update per-stream FPS from PTS.
if okPTS {
ft := g.fpsTrackers[g.VideoH264Index]
if ft == nil {
ft = newFPSTracker(30)
g.fpsTrackers[g.VideoH264Index] = ft
}
if ptsFPS := ft.update(pts); ptsFPS > 0 && ptsFPS <= 120 {
g.Streams[g.VideoH264Index].FPS = ptsFPS
}
}
// We'll need to read out a few things.
// prepend an AUD. This is required by some players
filteredAU = [][]byte{
@@ -578,8 +657,10 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
nonIDRPresent := false
idrPresent := false
var naluTypes []string
for _, nalu := range au {
typ := h264.NALUType(nalu[0] & 0x1F)
naluTypes = append(naluTypes, fmt.Sprintf("%s(%d,sz=%d)", typ.String(), int(typ), len(nalu)))
switch typ {
case h264.NALUTypeAccessUnitDelimiter:
continue
@@ -626,6 +707,11 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
return
}
if idrPresent {
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.Start(%s): IDR frame NALUs: [%s]",
streamType, fmt.Sprintf("%v", naluTypes)))
}
// Convert to packet.
enc, err := h264.AnnexBMarshal(filteredAU)
if err != nil {
@@ -651,10 +737,14 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
keyframeInterval := g.trackKeyframeInterval(idrPresent)
if idrPresent && keyframeInterval > 0 {
avgInterval := g.getAverageKeyframeInterval()
gopDuration := float64(keyframeInterval) / g.Streams[g.VideoH265Index].FPS
fps := g.Streams[g.VideoH264Index].FPS
if fps <= 0 {
fps = 25.0 // Default fallback FPS
}
gopDuration := float64(keyframeInterval) / fps
gopSize := int(avgInterval) // Store GOP size in a separate variable
g.Streams[g.VideoH264Index].GopSize = gopSize
log.Log.Info(fmt.Sprintf("capture.golibrtsp.Start(%s): Keyframe interval=%d packets, Avg=%.1f, GOP=%.1fs, GOPSize=%d",
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.Start(%s): Keyframe interval=%d packets, Avg=%.1f, GOP=%.1fs, GOPSize=%d",
streamType, keyframeInterval, avgInterval, gopDuration, gopSize))
preRecording := configuration.Config.Capture.PreRecording
if preRecording > 0 && int(gopDuration) > 0 {
@@ -716,18 +806,17 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
if len(rtppkt.Payload) > 0 {
// decode timestamp
pts, ok := g.Client.PacketPTS(g.VideoH265Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.VideoH265Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
// decode timestamps — validate each call separately
pts, okPTS := g.Client.PacketPTS(g.VideoH265Media, rtppkt)
pts2, okPTS2 := g.Client.PacketPTS2(g.VideoH265Media, rtppkt)
if !okPTS2 {
log.Log.Debug("capture.golibrtsp.Start(): unable to get PTS")
return
}
// Extract access units from RTP packets
// We need to do this, because the decoder expects a full
// access unit. Once we have a full access unit, we can
// decode it, and know if it's a keyframe or not.
// Extract access units from RTP packets.
// We need a complete access unit to determine whether
// this is a keyframe.
au, errDecode := g.VideoH265Decoder.Decode(rtppkt)
if errDecode != nil {
if errDecode != rtph265.ErrNonStartingPacketAndNoPrevious && errDecode != rtph265.ErrMorePacketsNeeded {
@@ -736,6 +825,18 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
return
}
// Frame is complete — update per-stream FPS from PTS.
if okPTS {
ft := g.fpsTrackers[g.VideoH265Index]
if ft == nil {
ft = newFPSTracker(30)
g.fpsTrackers[g.VideoH265Index] = ft
}
if ptsFPS := ft.update(pts); ptsFPS > 0 && ptsFPS <= 120 {
g.Streams[g.VideoH265Index].FPS = ptsFPS
}
}
filteredAU = [][]byte{
{byte(h265.NALUType_AUD_NUT) << 1, 1, 0x50},
}
@@ -796,10 +897,14 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
keyframeInterval := g.trackKeyframeInterval(isRandomAccess)
if isRandomAccess && keyframeInterval > 0 {
avgInterval := g.getAverageKeyframeInterval()
gopDuration := float64(keyframeInterval) / g.Streams[g.VideoH265Index].FPS
fps := g.Streams[g.VideoH265Index].FPS
if fps <= 0 {
fps = 25.0 // Default fallback FPS
}
gopDuration := float64(keyframeInterval) / fps
gopSize := int(avgInterval) // Store GOP size in a separate variable
g.Streams[g.VideoH265Index].GopSize = gopSize
log.Log.Info(fmt.Sprintf("capture.golibrtsp.Start(%s): Keyframe interval=%d packets, Avg=%.1f, GOP=%.1fs, GOPSize=%d",
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.Start(%s): Keyframe interval=%d packets, Avg=%.1f, GOP=%.1fs, GOPSize=%d",
streamType, keyframeInterval, avgInterval, gopDuration, gopSize))
preRecording := configuration.Config.Capture.PreRecording
if preRecording > 0 && int(gopDuration) > 0 {
@@ -1179,10 +1284,11 @@ func WriteMPEG4Audio(forma *format.MPEG4Audio, aus [][]byte) ([]byte, error) {
// Initialize FPS calculation buffers
func (g *Golibrtsp) initFPSCalculation() {
g.frameBufferSize = 30 // Store last 30 frame intervals
g.frameTimeBuffer = make([]time.Duration, g.frameBufferSize)
g.frameBufferIndex = 0
g.lastFrameTime = time.Time{}
// Ensure the per-stream FPS trackers map exists. Individual trackers
// can be created lazily when a given stream index is first used.
if g.fpsTrackers == nil {
g.fpsTrackers = make(map[int8]*fpsTracker)
}
// Initialize I-frame interval tracking
g.keyframeBufferSize = 10 // Store last 10 keyframe intervals
@@ -1192,50 +1298,11 @@ func (g *Golibrtsp) initFPSCalculation() {
g.lastKeyframePacketCount = 0
}
// Calculate FPS from frame timestamps
func (g *Golibrtsp) calculateFPSFromTimestamps() float64 {
g.fpsMutex.Lock()
defer g.fpsMutex.Unlock()
if g.lastFrameTime.IsZero() {
g.lastFrameTime = time.Now()
return 0
}
now := time.Now()
interval := now.Sub(g.lastFrameTime)
g.lastFrameTime = now
// Store the interval
g.frameTimeBuffer[g.frameBufferIndex] = interval
g.frameBufferIndex = (g.frameBufferIndex + 1) % g.frameBufferSize
// Calculate average FPS from stored intervals
var totalInterval time.Duration
validSamples := 0
for _, interval := range g.frameTimeBuffer {
if interval > 0 {
totalInterval += interval
validSamples++
}
}
if validSamples == 0 {
return 0
}
avgInterval := totalInterval / time.Duration(validSamples)
if avgInterval == 0 {
return 0
}
return float64(time.Second) / float64(avgInterval)
}
// Get enhanced FPS information from SPS with fallback
// Get enhanced FPS information from SPS with fallback to PTS-based calculation.
// The PTS-based FPS is computed per completed frame via fpsTracker.update(),
// so by the time this is called we already have a good estimate.
func (g *Golibrtsp) getEnhancedFPS(sps *h264.SPS, streamIndex int8) float64 {
// First try to get FPS from SPS
// First try to get FPS from SPS VUI parameters
spsFPS := sps.FPS()
// Check if SPS FPS is reasonable (between 1 and 120 fps)
@@ -1244,11 +1311,13 @@ func (g *Golibrtsp) getEnhancedFPS(sps *h264.SPS, streamIndex int8) float64 {
return spsFPS
}
// Fallback to timestamp-based calculation
timestampFPS := g.calculateFPSFromTimestamps()
if timestampFPS > 0 && timestampFPS <= 120 {
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.getEnhancedFPS(): Timestamp FPS: %.2f", timestampFPS))
return timestampFPS
// Fallback to PTS-based FPS (already calculated per-frame)
if ft := g.fpsTrackers[streamIndex]; ft != nil {
ptsFPS := ft.fps()
if ptsFPS > 0 && ptsFPS <= 120 {
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.getEnhancedFPS(): PTS FPS: %.2f", ptsFPS))
return ptsFPS
}
}
// Return SPS FPS even if it seems unreasonable, or default

View File

@@ -280,7 +280,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
vpsNALUS := configuration.Config.Capture.IPCamera.VPSNALUs
// Create a video file, and set the dimensions.
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS)
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
@@ -500,7 +500,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
vpsNALUS := configuration.Config.Capture.IPCamera.VPSNALUs
// Create a video file, and set the dimensions.
mp4Video := video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS)
mp4Video := video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
@@ -727,7 +727,7 @@ func VerifyCamera(c *gin.Context) {
}
}
func Base64Image(captureDevice *Capture, communication *models.Communication) string {
func Base64Image(captureDevice *Capture, communication *models.Communication, configuration *models.Configuration) string {
// We'll try to get a snapshot from the camera.
var queue *packets.Queue
var cursor *packets.QueueCursor
@@ -757,7 +757,8 @@ func Base64Image(captureDevice *Capture, communication *models.Communication) st
var img image.YCbCr
img, err = (*rtspClient).DecodePacket(pkt)
if err == nil {
bytes, _ := utils.ImageToBytes(&img)
imageResized, _ := utils.ResizeImage(&img, uint(configuration.Config.Capture.IPCamera.BaseWidth), uint(configuration.Config.Capture.IPCamera.BaseHeight))
bytes, _ := utils.ImageToBytes(imageResized)
encodedImage = base64.StdEncoding.EncodeToString(bytes)
break
} else {

View File

@@ -706,7 +706,8 @@ func HandleLiveStreamSD(livestreamCursor *packets.QueueCursor, configuration *mo
log.Log.Info("cloud.HandleLiveStreamSD(): Sending base64 encoded images to MQTT.")
img, err := rtspClient.DecodePacket(pkt)
if err == nil {
bytes, _ := utils.ImageToBytes(&img)
imageResized, _ := utils.ResizeImage(&img, uint(config.Capture.IPCamera.BaseWidth), uint(config.Capture.IPCamera.BaseHeight))
bytes, _ := utils.ImageToBytes(imageResized)
chunking := config.Capture.LiveviewChunking
@@ -803,6 +804,12 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
streams, _ := rtspClient.GetStreams()
videoTrack := webrtc.NewVideoTrack(streams)
audioTrack := webrtc.NewAudioTrack(streams)
if videoTrack == nil && audioTrack == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio tracks")
return
}
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoTrack, audioTrack, rtspClient)
if config.Capture.ForwardWebRTC == "true" {
@@ -864,7 +871,8 @@ func HandleRealtimeProcessing(processingCursor *packets.QueueCursor, configurati
log.Log.Info("cloud.RealtimeProcessing(): Sending base64 encoded images to MQTT.")
img, err := rtspClient.DecodePacket(pkt)
if err == nil {
bytes, _ := utils.ImageToBytes(&img)
imageResized, _ := utils.ResizeImage(&img, uint(config.Capture.IPCamera.BaseWidth), uint(config.Capture.IPCamera.BaseHeight))
bytes, _ := utils.ImageToBytes(imageResized)
encoded := base64.StdEncoding.EncodeToString(bytes)
valueMap := make(map[string]interface{})

View File

@@ -173,6 +173,21 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
configuration.Config.Capture.IPCamera.Width = width
configuration.Config.Capture.IPCamera.Height = height
// Set the liveview width and height, this is used for the liveview and motion regions (drawing on the hub).
baseWidth := config.Capture.IPCamera.BaseWidth
baseHeight := config.Capture.IPCamera.BaseHeight
// If the liveview height is not set, we will calculate it based on the width and aspect ratio of the camera.
if baseWidth > 0 && baseHeight == 0 {
widthAspectRatio := float64(baseWidth) / float64(width)
configuration.Config.Capture.IPCamera.BaseHeight = int(float64(height) * widthAspectRatio)
} else if baseHeight > 0 && baseWidth > 0 {
configuration.Config.Capture.IPCamera.BaseHeight = baseHeight
configuration.Config.Capture.IPCamera.BaseWidth = baseWidth
} else {
configuration.Config.Capture.IPCamera.BaseHeight = height
configuration.Config.Capture.IPCamera.BaseWidth = width
}
// Set the SPS and PPS values in the configuration.
configuration.Config.Capture.IPCamera.SPSNALUs = [][]byte{videoStream.SPS}
configuration.Config.Capture.IPCamera.PPSNALUs = [][]byte{videoStream.PPS}
@@ -226,6 +241,22 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
// Set config values as well
configuration.Config.Capture.IPCamera.SubWidth = width
configuration.Config.Capture.IPCamera.SubHeight = height
// If we have a substream, we need to set the width and height of the substream. (so we will override above information)
// Set the liveview width and height, this is used for the liveview and motion regions (drawing on the hub).
baseWidth := config.Capture.IPCamera.BaseWidth
baseHeight := config.Capture.IPCamera.BaseHeight
// If the liveview height is not set, we will calculate it based on the width and aspect ratio of the camera.
if baseWidth > 0 && baseHeight == 0 {
widthAspectRatio := float64(baseWidth) / float64(width)
configuration.Config.Capture.IPCamera.BaseHeight = int(float64(height) * widthAspectRatio)
} else if baseHeight > 0 && baseWidth > 0 {
configuration.Config.Capture.IPCamera.BaseHeight = baseHeight
configuration.Config.Capture.IPCamera.BaseWidth = baseWidth
} else {
configuration.Config.Capture.IPCamera.BaseHeight = height
configuration.Config.Capture.IPCamera.BaseWidth = width
}
}
// We are creating a queue to store the RTSP frames in, these frames will be
@@ -676,7 +707,7 @@ func MakeRecording(c *gin.Context, communication *models.Communication) {
// @Success 200
func GetSnapshotBase64(c *gin.Context, captureDevice *capture.Capture, configuration *models.Configuration, communication *models.Communication) {
// We'll try to get a snapshot from the camera.
base64Image := capture.Base64Image(captureDevice, communication)
base64Image := capture.Base64Image(captureDevice, communication, configuration)
if base64Image != "" {
communication.Image = base64Image
}
@@ -698,7 +729,8 @@ func GetSnapshotRaw(c *gin.Context, captureDevice *capture.Capture, configuratio
image := capture.JpegImage(captureDevice, communication)
// encode image to jpeg
bytes, _ := utils.ImageToBytes(&image)
imageResized, _ := utils.ResizeImage(&image, uint(configuration.Config.Capture.IPCamera.BaseWidth), uint(configuration.Config.Capture.IPCamera.BaseHeight))
bytes, _ := utils.ImageToBytes(imageResized)
// Return image/jpeg
c.Data(200, "image/jpeg", bytes)
@@ -713,7 +745,7 @@ func GetSnapshotRaw(c *gin.Context, captureDevice *capture.Capture, configuratio
// @Success 200
func GetConfig(c *gin.Context, captureDevice *capture.Capture, configuration *models.Configuration, communication *models.Communication) {
// We'll try to get a snapshot from the camera.
base64Image := capture.Base64Image(captureDevice, communication)
base64Image := capture.Base64Image(captureDevice, communication, configuration)
if base64Image != "" {
communication.Image = base64Image
}

View File

@@ -63,16 +63,34 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf
}
}
// A user might have set the base width and height for the IPCamera.
// This means also the polygon coordinates are set to a specific width and height (which might be different than the actual packets
// received from the IPCamera). So we will resize the polygon coordinates to the base width and height.
baseWidthRatio := 1.0
baseHeightRatio := 1.0
baseWidth := config.Capture.IPCamera.BaseWidth
baseHeight := config.Capture.IPCamera.BaseHeight
if baseWidth > 0 && baseHeight > 0 {
// We'll get the first image to calculate the ratio
img := imageArray[0]
if img != nil {
bounds := img.Bounds()
rows := bounds.Dy()
cols := bounds.Dx()
baseWidthRatio = float64(cols) / float64(baseWidth)
baseHeightRatio = float64(rows) / float64(baseHeight)
}
}
// Calculate mask
var polyObjects []geo.Polygon
if config.Region != nil {
for _, polygon := range config.Region.Polygon {
coords := polygon.Coordinates
poly := geo.Polygon{}
for _, c := range coords {
x := c.X
y := c.Y
x := c.X * baseWidthRatio
y := c.Y * baseHeightRatio
p := geo.NewPoint(x, y)
if !poly.Contains(p) {
poly.Add(p)

View File

@@ -239,7 +239,15 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
configuration.Config.Capture.IPCamera.SubRTSP = value
break
/* ONVIF connnection settings */
/* Base width and height for the liveview and motion regions */
case "AGENT_CAPTURE_IPCAMERA_BASE_WIDTH":
configuration.Config.Capture.IPCamera.BaseWidth, _ = strconv.Atoi(value)
break
case "AGENT_CAPTURE_IPCAMERA_BASE_HEIGHT":
configuration.Config.Capture.IPCamera.BaseHeight, _ = strconv.Atoi(value)
break
/* ONVIF connnection settings */
case "AGENT_CAPTURE_IPCAMERA_ONVIF":
configuration.Config.Capture.IPCamera.ONVIF = value
break
@@ -583,6 +591,10 @@ func StoreConfig(configDirectory string, config models.Config) error {
config.Encryption.PrivateKey = encryptionPrivateKey
}
// Reset the basewidth and baseheight
config.Capture.IPCamera.BaseWidth = 0
config.Capture.IPCamera.BaseHeight = 0
// Save into database
if os.Getenv("DEPLOYMENT") == "factory" || os.Getenv("MACHINERY_ENVIRONMENT") == "kubernetes" {
// Write to mongodb

View File

@@ -79,13 +79,18 @@ type Capture struct {
// IPCamera configuration, such as the RTSP url of the IPCamera and the FPS.
// Also includes ONVIF integration
type IPCamera struct {
RTSP string `json:"rtsp"`
Width int `json:"width"`
Height int `json:"height"`
FPS string `json:"fps"`
SubRTSP string `json:"sub_rtsp"`
SubWidth int `json:"sub_width"`
SubHeight int `json:"sub_height"`
RTSP string `json:"rtsp"`
Width int `json:"width"`
Height int `json:"height"`
FPS string `json:"fps"`
SubRTSP string `json:"sub_rtsp"`
SubWidth int `json:"sub_width"`
SubHeight int `json:"sub_height"`
BaseWidth int `json:"base_width"`
BaseHeight int `json:"base_height"`
SubFPS string `json:"sub_fps"`
ONVIF string `json:"onvif,omitempty" bson:"onvif"`
ONVIFXAddr string `json:"onvif_xaddr" bson:"onvif_xaddr"`

View File

@@ -15,7 +15,7 @@ import (
func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirectory string, configuration *models.Configuration, communication *models.Communication, captureDevice *capture.Capture) *gin.RouterGroup {
r.GET("/ws", func(c *gin.Context) {
websocket.WebsocketHandler(c, communication, captureDevice)
websocket.WebsocketHandler(c, configuration, communication, captureDevice)
})
// This is legacy should be removed in future! Now everything

View File

@@ -123,7 +123,6 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
opts.SetClientID(mqttClientID)
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): Set ClientID " + mqttClientID)
rand.Seed(time.Now().UnixNano())
webrtc.CandidateArrays = make(map[string](chan string))
opts.OnConnect = func(c mqtt.Client) {
// We managed to connect to the MQTT broker, hurray!
@@ -389,14 +388,6 @@ func HandleRequestConfig(mqttClient mqtt.Client, hubKey string, payload models.P
// Copy the config, as we don't want to share the encryption part.
deepCopy := configuration.Config
// We need a fix for the width and height if a substream.
// The ROI requires the width and height of the sub stream.
if configuration.Config.Capture.IPCamera.SubRTSP != "" &&
configuration.Config.Capture.IPCamera.SubRTSP != configuration.Config.Capture.IPCamera.RTSP {
deepCopy.Capture.IPCamera.Width = configuration.Config.Capture.IPCamera.SubWidth
deepCopy.Capture.IPCamera.Height = configuration.Config.Capture.IPCamera.SubHeight
}
var configMap map[string]interface{}
inrec, _ := json.Marshal(deepCopy)
json.Unmarshal(inrec, &configMap)

View File

@@ -49,7 +49,7 @@ var upgrader = websocket.Upgrader{
},
}
func WebsocketHandler(c *gin.Context, communication *models.Communication, captureDevice *capture.Capture) {
func WebsocketHandler(c *gin.Context, configuration *models.Configuration, communication *models.Communication, captureDevice *capture.Capture) {
w := c.Writer
r := c.Request
conn, err := upgrader.Upgrade(w, r, nil)
@@ -112,7 +112,7 @@ func WebsocketHandler(c *gin.Context, communication *models.Communication, captu
ctx, cancel := context.WithCancel(context.Background())
sockets[clientID].Cancels["stream-sd"] = cancel
go ForwardSDStream(ctx, clientID, sockets[clientID], communication, captureDevice)
go ForwardSDStream(ctx, clientID, sockets[clientID], configuration, communication, captureDevice)
}
}
}
@@ -131,7 +131,7 @@ func WebsocketHandler(c *gin.Context, communication *models.Communication, captu
}
}
func ForwardSDStream(ctx context.Context, clientID string, connection *Connection, communication *models.Communication, captureDevice *capture.Capture) {
func ForwardSDStream(ctx context.Context, clientID string, connection *Connection, configuration *models.Configuration, communication *models.Communication, captureDevice *capture.Capture) {
var queue *packets.Queue
var cursor *packets.QueueCursor
@@ -159,7 +159,10 @@ logreader:
var img image.YCbCr
img, err = (*rtspClient).DecodePacket(pkt)
if err == nil {
bytes, _ := utils.ImageToBytes(&img)
config := configuration.Config
// Resize the image to the base width and height
imageResized, _ := utils.ResizeImage(&img, uint(config.Capture.IPCamera.BaseWidth), uint(config.Capture.IPCamera.BaseHeight))
bytes, _ := utils.ImageToBytes(imageResized)
encodedImage = base64.StdEncoding.EncodeToString(bytes)
} else {
continue

View File

@@ -21,6 +21,8 @@ import (
"github.com/kerberos-io/agent/machinery/src/encryption"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/nfnt/resize"
)
const VERSION = "3.5.0"
@@ -401,9 +403,31 @@ func Decrypt(directoryOrFile string, symmetricKey []byte) {
}
}
func ImageToBytes(img image.Image) ([]byte, error) {
func ImageToBytes(img *image.Image) ([]byte, error) {
buffer := new(bytes.Buffer)
w := bufio.NewWriter(buffer)
err := jpeg.Encode(w, img, &jpeg.Options{Quality: 15})
err := jpeg.Encode(w, *img, &jpeg.Options{Quality: 35})
log.Log.Debug("ImageToBytes() - buffer size: " + strconv.Itoa(buffer.Len()))
return buffer.Bytes(), err
}
func ResizeImage(img image.Image, newWidth uint, newHeight uint) (*image.Image, error) {
if img == nil {
return nil, errors.New("image is nil")
}
// resize to width 640 using Lanczos resampling
// and preserve aspect ratio
m := resize.Resize(newWidth, newHeight, img, resize.Lanczos3)
return &m, nil
}
func ResizeHeightWithAspectRatio(newWidth int, width int, height int) (int, int) {
if newWidth <= 0 || width <= 0 || height <= 0 {
return width, height
}
// Calculate the new height based on the aspect ratio
newHeight := (newWidth * height) / width
// Return the new dimensions
return newWidth, newHeight
}

View File

@@ -22,73 +22,95 @@ import (
var LastPTS uint64 = 0 // Last PTS for the current segment
// FragmentDurationMs is the target duration for each fragment in milliseconds.
// Fragments will be flushed at the first keyframe after this duration has elapsed,
// resulting in ~3 second fragments (assuming a typical GOP interval).
const FragmentDurationMs = 3000
type MP4 struct {
// FileName is the name of the file
FileName string
width int
height int
Segments []*mp4ff.MediaSegment // List of media segments
Segment *mp4ff.MediaSegment
MultiTrackFragment *mp4ff.Fragment
TrackIDs []uint32
FileWriter *os.File
Writer *bufio.Writer
SegmentCount int
SampleCount int
StartPTS uint64
VideoTotalDuration uint64
AudioTotalDuration uint64
AudioPTS uint64
Start bool
SPSNALUs [][]byte // SPS NALUs for H264
PPSNALUs [][]byte // PPS NALUs for H264
VPSNALUs [][]byte // VPS NALUs for H264
FreeBoxSize int64
MoofBoxes int64 // Number of moof boxes in the file
MoofBoxSizes []int64 // Sizes of each moof box
StartTime uint64 // Start time of the MP4 file
VideoTrackName string // Name of the video track
VideoTrack int // Track ID for the video track
AudioTrackName string // Name of the audio track
AudioTrack int // Track ID for the audio track
VideoFullSample *mp4ff.FullSample // Full sample for video track
AudioFullSample *mp4ff.FullSample // Full sample for audio track
LastAudioSampleDTS uint64 // Last PTS for audio sample
LastVideoSampleDTS uint64 // Last PTS for video sample
SampleType string // Type of the sample (e.g., "video", "audio", "subtitle")
FileName string
width int
height int
Segments []*mp4ff.MediaSegment // List of media segments
Segment *mp4ff.MediaSegment
MultiTrackFragment *mp4ff.Fragment
TrackIDs []uint32
FileWriter *os.File
Writer *bufio.Writer
SegmentCount int
SampleCount int
StartPTS uint64
VideoTotalDuration uint64
AudioTotalDuration uint64
AudioPTS uint64
Start bool
SPSNALUs [][]byte // SPS NALUs for H264
PPSNALUs [][]byte // PPS NALUs for H264
VPSNALUs [][]byte // VPS NALUs for H264
FreeBoxSize int64
FragmentStartRawPTS uint64 // Raw PTS for timing when to flush fragments
FragmentStartDTS uint64 // Accumulated VideoTotalDuration at fragment start (matches tfdt)
MoofBoxes int64 // Number of moof boxes in the file
MoofBoxSizes []int64 // Sizes of each moof box
SegmentDurations []uint64 // Duration of each segment in timescale units
SegmentBaseDecTimes []uint64 // Base decode time of each segment
StartTime uint64 // Start time of the MP4 file
VideoTrackName string // Name of the video track
VideoTrack int // Track ID for the video track
AudioTrackName string // Name of the audio track
AudioTrack int // Track ID for the audio track
VideoFullSample *mp4ff.FullSample // Full sample for video track
AudioFullSample *mp4ff.FullSample // Full sample for audio track
LastAudioSampleDTS uint64 // Last PTS for audio sample
LastVideoSampleDTS uint64 // Last PTS for video sample
SampleType string // Type of the sample (e.g., "video", "audio", "subtitle")
TotalKeyframesReceived int // Total keyframes received by AddSampleToTrack
TotalKeyframesWritten int // Total keyframes written to trun boxes
FragmentKeyframeCount int // Keyframes in the current fragment
PendingSampleIsKeyframe bool // Whether the pending video sample is a keyframe
}
// NewMP4 creates a new MP4 object
func NewMP4(fileName string, spsNALUs [][]byte, ppsNALUs [][]byte, vpsNALUs [][]byte) *MP4 {
// NewMP4 creates a new MP4 object.
// maxDurationSec is the maximum expected recording duration in seconds,
// used to calculate the free-box placeholder size for ftyp+moov+sidx.
func NewMP4(fileName string, spsNALUs [][]byte, ppsNALUs [][]byte, vpsNALUs [][]byte, maxDurationSec int64) *MP4 {
init := mp4ff.NewMP4Init()
// Add a free box to the init segment
// Prepend a free box to the init segment with a size of 1000
freeBoxSize := 2048
// Calculate the placeholder size needed at the start of the file.
// Components:
// ftyp: ~32 bytes
// moov: ~1500 bytes (mvhd + mvex + video trak + audio trak + UUID)
// sidx: 24 bytes fixed + 12 bytes per segment reference
// Segments are ~FragmentDurationMs each, so:
// numSegments = ceil(maxDurationSec * 1000 / FragmentDurationMs) + 1 (safety margin)
// sidxSize = 24 + 12 * numSegments
baseSize := int64(2560) // ftyp + moov + extra headroom for large UUID signatures
numSegments := int64(0)
if maxDurationSec > 0 {
// Use integer ceiling division to avoid underestimating the number of segments.
numSegments = ((maxDurationSec*1000)+FragmentDurationMs-1)/FragmentDurationMs + 1
}
sidxSize := int64(24 + 12*numSegments)
freeBoxSize := int(baseSize + sidxSize)
free := mp4ff.NewFreeBox(make([]byte, freeBoxSize))
init.AddChild(free)
// Create a writer
ofd, err := os.Create(fileName)
if err != nil {
panic(err)
}
// Create a buffered writer
bufferedWriter := bufio.NewWriterSize(ofd, 64*1024) // 64KB buffer
// We will write the empty init segment to the file
// so we can overwrite it later with the actual init segment.
err = init.Encode(bufferedWriter)
// Write the free box placeholder at the start of the file
err = free.Encode(bufferedWriter)
if err != nil {
panic(err)
}
return &MP4{
FileName: fileName,
StartTime: uint64(time.Now().Unix()),
FreeBoxSize: int64(freeBoxSize),
FreeBoxSize: int64(freeBoxSize) + 8, // payload + 8 byte box header
FileWriter: ofd,
Writer: bufferedWriter,
SPSNALUs: spsNALUs,
@@ -132,41 +154,126 @@ func (mp4 *MP4) AddAudioTrack(codec string) uint32 {
func (mp4 *MP4) AddMediaSegment(segNr int) {
}
// flushPendingVideoSample writes the pending video sample to the current fragment.
// If nextPTS is provided (non-zero), it calculates duration from the PTS difference.
// If nextPTS is 0 (e.g., at Close time), it uses the last known duration.
// Returns true if a sample was flushed, false if there was no pending sample.
func (mp4 *MP4) flushPendingVideoSample(nextPTS uint64) bool {
if mp4.VideoFullSample == nil || mp4.MultiTrackFragment == nil {
return false
}
var duration uint64
if nextPTS > 0 && nextPTS > mp4.VideoFullSample.DecodeTime {
duration = nextPTS - mp4.VideoFullSample.DecodeTime
} else {
// No valid nextPTS (Close case) or PTS went backwards (jitter/discontinuity)
if nextPTS > 0 {
log.Log.Warning(fmt.Sprintf("mp4.flushPendingVideoSample(): video PTS went backwards or zero duration (nextPTS=%d, prevDTS=%d), using last known duration", nextPTS, mp4.VideoFullSample.DecodeTime))
}
duration = mp4.LastVideoSampleDTS
if duration == 0 {
duration = 33 // Default ~30fps frame duration
}
}
mp4.LastVideoSampleDTS = duration
mp4.VideoTotalDuration += duration
mp4.VideoFullSample.DecodeTime = mp4.VideoTotalDuration - duration
mp4.VideoFullSample.Sample.Dur = uint32(duration)
isKF := mp4.PendingSampleIsKeyframe
err := mp4.MultiTrackFragment.AddFullSampleToTrack(*mp4.VideoFullSample, uint32(mp4.VideoTrack))
if err != nil {
log.Log.Error("mp4.flushPendingVideoSample(): error adding sample: " + err.Error())
}
if isKF {
mp4.TotalKeyframesWritten++
mp4.FragmentKeyframeCount++
log.Log.Debug(fmt.Sprintf("mp4.flushPendingVideoSample(): KEYFRAME WRITTEN to trun - totalWritten=%d, fragmentKF=%d, flags=0x%08x, dur=%d, DTS=%d",
mp4.TotalKeyframesWritten, mp4.FragmentKeyframeCount, mp4.VideoFullSample.Sample.Flags, duration, mp4.VideoFullSample.DecodeTime))
}
mp4.VideoFullSample = nil
mp4.PendingSampleIsKeyframe = false
return true
}
func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, pts uint64) error {
if isKeyframe && trackID == uint32(mp4.VideoTrack) {
mp4.TotalKeyframesReceived++
elapsedDbg := uint64(0)
if mp4.Start {
elapsedDbg = pts - mp4.FragmentStartRawPTS
}
log.Log.Debug(fmt.Sprintf("mp4.AddSampleToTrack(): KEYFRAME #%d received - PTS=%d, size=%d, elapsed=%dms, started=%t, segment=%d, fragKF=%d",
mp4.TotalKeyframesReceived, pts, len(data), elapsedDbg, mp4.Start, mp4.SegmentCount, mp4.FragmentKeyframeCount))
}
if isKeyframe {
// Write the segment to the file
// Determine whether to start a new fragment.
// We only flush at a keyframe boundary once at least FragmentDurationMs
// of content has been accumulated, resulting in ~3 second fragments.
elapsed := uint64(0)
if mp4.Start {
mp4.MoofBoxes = mp4.MoofBoxes + 1
mp4.MoofBoxSizes = append(mp4.MoofBoxSizes, int64(mp4.Segment.Size()))
err := mp4.Segment.Encode(mp4.Writer)
if err != nil {
return err
elapsed = pts - mp4.FragmentStartRawPTS
}
shouldFlush := !mp4.Start || elapsed >= FragmentDurationMs
if shouldFlush {
// Write the previous segment to the file
if mp4.Start {
// IMPORTANT: Add any pending video sample to the current segment BEFORE flushing.
// This ensures the segment contains all frames up to (but not including) this keyframe,
// and the new segment will start cleanly with this keyframe.
if trackID == uint32(mp4.VideoTrack) {
mp4.flushPendingVideoSample(pts)
}
log.Log.Debug(fmt.Sprintf("mp4.AddSampleToTrack(): FLUSHING segment #%d - keyframes_in_fragment=%d, totalKF_received=%d, totalKF_written=%d",
mp4.SegmentCount, mp4.FragmentKeyframeCount, mp4.TotalKeyframesReceived, mp4.TotalKeyframesWritten))
mp4.MoofBoxes = mp4.MoofBoxes + 1
mp4.MoofBoxSizes = append(mp4.MoofBoxSizes, int64(mp4.Segment.Size()))
// Track the segment's duration and base decode time for sidx.
// Use accumulated VideoTotalDuration which matches the tfdt values
// in the trun boxes, NOT raw PTS from the camera.
segDuration := mp4.VideoTotalDuration - mp4.FragmentStartDTS
mp4.SegmentDurations = append(mp4.SegmentDurations, segDuration)
mp4.SegmentBaseDecTimes = append(mp4.SegmentBaseDecTimes, mp4.FragmentStartDTS)
err := mp4.Segment.Encode(mp4.Writer)
if err != nil {
log.Log.Error("mp4.AddSampleToTrack(): error encoding segment: " + err.Error())
}
mp4.Segments = append(mp4.Segments, mp4.Segment)
}
mp4.Segments = append(mp4.Segments, mp4.Segment)
mp4.Start = true
// Increment the segment count
mp4.SegmentCount = mp4.SegmentCount + 1
// Create a new media segment
seg := mp4ff.NewMediaSegment()
// Create a video fragment
multiTrackFragment, err := mp4ff.CreateMultiTrackFragment(uint32(mp4.SegmentCount), mp4.TrackIDs)
if err != nil {
log.Log.Error("mp4.AddSampleToTrack(): error creating multi track fragment: " + err.Error())
}
mp4.MultiTrackFragment = multiTrackFragment
seg.AddFragment(multiTrackFragment)
// Set to MP4 struct
mp4.Segment = seg
// Set the start PTS for the next segment
mp4.StartPTS = pts
mp4.FragmentStartRawPTS = pts
mp4.FragmentStartDTS = mp4.VideoTotalDuration
mp4.FragmentKeyframeCount = 0 // Reset keyframe counter for new fragment
}
mp4.Start = true
// Increment the segment count
mp4.SegmentCount = mp4.SegmentCount + 1
// Create a new media segment
seg := mp4ff.NewMediaSegment()
// Create a video fragment
multiTrackFragment, err := mp4ff.CreateMultiTrackFragment(uint32(mp4.SegmentCount), mp4.TrackIDs) // Assuming 1 for video track and 2 for audio track
if err != nil {
}
mp4.MultiTrackFragment = multiTrackFragment
seg.AddFragment(multiTrackFragment)
// Set to MP4 struct
mp4.Segment = seg
// Set the start PTS for the next segment
mp4.StartPTS = pts
}
if mp4.Start {
@@ -175,27 +282,18 @@ func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, p
var lengthPrefixed []byte
var err error
if mp4.VideoTrackName == "H264" || mp4.VideoTrackName == "AVC1" { // Convert Annex B to length-prefixed NAL units if H264
switch mp4.VideoTrackName {
case "H264", "AVC1": // Convert Annex B to length-prefixed NAL units if H264
lengthPrefixed, err = annexBToLengthPrefixed(data)
} else if mp4.VideoTrackName == "H265" || mp4.VideoTrackName == "HVC1" { // Convert H265 Annex B to length-prefixed NAL units
case "H265", "HVC1": // Convert H265 Annex B to length-prefixed NAL units
lengthPrefixed, err = annexBToLengthPrefixed(data)
}
if err == nil {
// Flush previous pending sample before storing the new one
if mp4.VideoFullSample != nil {
duration := pts - mp4.VideoFullSample.DecodeTime
log.Log.Debug("Adding sample to track " + fmt.Sprintf("%d, PTS: %d, Duration: %d, size: %d, Keyframe: %t", trackID, pts, duration, len(lengthPrefixed), isKeyframe))
mp4.LastVideoSampleDTS = duration
//fmt.Printf("Adding sample to track %d, PTS: %d, Duration: %d, size: %d, Keyframe: %t\n", trackID, pts, duration, len(mp4.VideoFullSample.Data), isKeyframe)
mp4.VideoTotalDuration += duration
mp4.VideoFullSample.DecodeTime = mp4.VideoTotalDuration - duration
mp4.VideoFullSample.Sample.Dur = uint32(duration)
err := mp4.MultiTrackFragment.AddFullSampleToTrack(*mp4.VideoFullSample, trackID)
if err != nil {
//log.Printf("Error adding sample to track %d: %v", trackID, err)
return err
}
log.Log.Debug("Adding sample to track " + fmt.Sprintf("%d, PTS: %d, size: %d, Keyframe: %t", trackID, pts, len(lengthPrefixed), isKeyframe))
mp4.flushPendingVideoSample(pts)
}
// Set the sample data
@@ -212,6 +310,7 @@ func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, p
CompositionTimeOffset: 0, // No composition time offset for video
}
mp4.VideoFullSample = &fullSample
mp4.PendingSampleIsKeyframe = isKeyframe
mp4.SampleType = "video"
}
} else if trackID == uint32(mp4.AudioTrack) {
@@ -261,50 +360,69 @@ func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, p
func (mp4 *MP4) Close(config *models.Config) {
// Add the last sample to the track, we will predict the duration based on the last sample
// We are not insert the last sample as we might corrupt playback (as we do not know accurately the next PTS).
// In theory it means we will lose the last sample, so there is millisecond dataloss, but it is better than corrupting playback.
// We could this by using a delayed packet reader, and look for the next PTS (closest one), but that would require a lot of memory and CPU.
/*duration := uint64(0)
trackID := uint32(1)
if mp4.SampleType == "video" {
duration = mp4.LastVideoSampleDTS
trackID = uint32(mp4.VideoTrack)
} else if mp4.SampleType == "audio" {
duration = 21 //mp4.LastAudioSampleDTS
} else {
log.Println("mp4.Close(): unknown sample type, cannot calculate duration")
}
if duration > 0 {
mp4.VideoTotalDuration += duration
mp4.VideoFullSample.DecodeTime = mp4.VideoTotalDuration - duration
mp4.VideoFullSample.Sample.Dur = uint32(duration)
err := mp4.MultiTrackFragment.AddFullSampleToTrack(*mp4.VideoFullSample, trackID)
if err != nil {
}
mp4.Segments = append(mp4.Segments, mp4.Segment)
}*/
log.Log.Info(fmt.Sprintf("mp4.Close(): KEYFRAME SUMMARY - totalReceived=%d, totalWritten=%d, segments=%d, lastFragmentKF=%d",
mp4.TotalKeyframesReceived, mp4.TotalKeyframesWritten, mp4.SegmentCount, mp4.FragmentKeyframeCount))
if mp4.VideoTotalDuration == 0 && mp4.AudioTotalDuration == 0 {
log.Log.Error("mp4.Close(): no video or audio samples added, cannot create MP4 file")
}
// Add final pending samples before closing
if mp4.Segment != nil {
// Add final video sample if pending (pass 0 as nextPTS to use last known duration)
mp4.flushPendingVideoSample(0)
// Add final audio sample if pending
if mp4.AudioFullSample != nil && mp4.AudioTrack > 0 {
SplitAACFrame(mp4.AudioFullSample.Data, func(started bool, aac []byte) {
sampleToAdd := *mp4.AudioFullSample
dts := mp4.LastAudioSampleDTS
if dts == 0 {
dts = 1024 // Default AAC frame duration
}
mp4.AudioTotalDuration += dts
mp4.AudioPTS += dts
sampleToAdd.Data = aac[7:]
sampleToAdd.DecodeTime = mp4.AudioPTS - dts
sampleToAdd.Sample.Dur = uint32(dts)
sampleToAdd.Sample.Size = uint32(len(aac[7:]))
err := mp4.MultiTrackFragment.AddFullSampleToTrack(sampleToAdd, uint32(mp4.AudioTrack))
if err != nil {
log.Log.Error("mp4.Close(): error adding final audio sample: " + err.Error())
}
})
mp4.AudioFullSample = nil
}
}
// Encode the last segment
if mp4.Segment != nil {
// Track the last segment's size, duration and base decode time.
// Use accumulated VideoTotalDuration which matches tfdt values.
mp4.MoofBoxes = mp4.MoofBoxes + 1
mp4.MoofBoxSizes = append(mp4.MoofBoxSizes, int64(mp4.Segment.Size()))
lastSegDuration := mp4.VideoTotalDuration - mp4.FragmentStartDTS
if lastSegDuration == 0 {
lastSegDuration = mp4.LastVideoSampleDTS
}
mp4.SegmentDurations = append(mp4.SegmentDurations, lastSegDuration)
mp4.SegmentBaseDecTimes = append(mp4.SegmentBaseDecTimes, mp4.FragmentStartDTS)
err := mp4.Segment.Encode(mp4.Writer)
if err != nil {
panic(err)
log.Log.Error("mp4.Close(): error encoding last segment: " + err.Error())
}
}
mp4.Writer.Flush()
defer mp4.FileWriter.Close()
// Ensure all segment data is on disk before we overwrite the placeholder at offset 0.
if err := mp4.FileWriter.Sync(); err != nil {
log.Log.Error("mp4.Close(): error syncing file: " + err.Error())
}
// Now we have all the moof and mdat boxes written to the file.
// We can now generate the ftyp and moov boxes, and replace it with the free box we added earlier (size of 10008 bytes).
// We build the ftyp + moov init segment and write it at the start,
// overwriting the free box placeholder we reserved in NewMP4.
init := mp4ff.NewMP4Init()
// Create a new ftyp box
@@ -337,26 +455,29 @@ func (mp4 *MP4) Close(config *models.Config) {
init.Moov.AddChild(mvex)
// Add a track for the video
if mp4.VideoTrackName == "H264" || mp4.VideoTrackName == "AVC1" {
switch mp4.VideoTrackName {
case "H264", "AVC1":
init.AddEmptyTrack(videoTimescale, "video", "und")
includePS := true
err := init.Moov.Traks[0].SetAVCDescriptor("avc1", mp4.SPSNALUs, mp4.PPSNALUs, includePS)
if err != nil {
//panic(err)
}
init.Moov.Traks[0].Tkhd.Duration = mp4.VideoTotalDuration
init.Moov.Traks[0].Tkhd.Width = mp4ff.Fixed32(uint32(mp4.width) << 16)
init.Moov.Traks[0].Tkhd.Height = mp4ff.Fixed32(uint32(mp4.height) << 16)
init.Moov.Traks[0].Mdia.Hdlr.Name = "agent " + utils.VERSION
//init.Moov.Traks[0].Mdia.Mdhd.Duration = mp4.VideoTotalDuration
} else if mp4.VideoTrackName == "H265" || mp4.VideoTrackName == "HVC1" {
init.Moov.Traks[0].Mdia.Mdhd.Duration = mp4.VideoTotalDuration
case "H265", "HVC1":
init.AddEmptyTrack(videoTimescale, "video", "und")
includePS := true
err := init.Moov.Traks[0].SetHEVCDescriptor("hvc1", mp4.VPSNALUs, mp4.SPSNALUs, mp4.PPSNALUs, [][]byte{}, includePS)
if err != nil {
//panic(err)
}
init.Moov.Traks[0].Tkhd.Duration = mp4.VideoTotalDuration
init.Moov.Traks[0].Tkhd.Width = mp4ff.Fixed32(uint32(mp4.width) << 16)
init.Moov.Traks[0].Tkhd.Height = mp4ff.Fixed32(uint32(mp4.height) << 16)
init.Moov.Traks[0].Mdia.Hdlr.Name = "agent " + utils.VERSION
//init.Moov.Traks[0].Mdia.Mdhd.Duration = mp4.VideoTotalDuration
init.Moov.Traks[0].Mdia.Mdhd.Duration = mp4.VideoTotalDuration
}
// Try adding audio track if available
@@ -372,11 +493,10 @@ func (mp4 *MP4) Close(config *models.Config) {
// Set the audio descriptor
err := init.Moov.Traks[1].SetAACDescriptor(29, audioSampleRate)
if err != nil {
//panic(err)
}
init.Moov.Traks[1].Tkhd.Duration = mp4.AudioTotalDuration
init.Moov.Traks[1].Mdia.Hdlr.Name = "agent " + utils.VERSION
//init.Moov.Traks[1].Mdia.Mdhd.Duration = mp4.AudioTotalDuration
init.Moov.Traks[1].Mdia.Mdhd.Duration = mp4.AudioTotalDuration
}
// Try adding subtitle track if available
@@ -454,127 +574,94 @@ func (mp4 *MP4) Close(config *models.Config) {
}
}
// We will also calculate the SIDX box, which is a segment index box that contains information about the segments in the file.
// This is useful for seeking in the file, and for streaming the file.
/*sidx := &mp4ff.SidxBox{
Version: 0,
Flags: 0,
ReferenceID: 0,
Timescale: videoTimescale,
EarliestPresentationTime: 0,
FirstOffset: 0,
SidxRefs: make([]mp4ff.SidxRef, 0),
}
referenceTrak := init.Moov.Trak
trex, ok := init.Moov.Mvex.GetTrex(referenceTrak.Tkhd.TrackID)
if !ok {
// We have an issue.
}
segDatas, err := findSegmentData(mp4.Segments, referenceTrak, trex)
if err != nil {
// We have an issue.
}
fillSidx(sidx, referenceTrak, segDatas, true)
// Add the SIDX box to the moov box
init.AddChild(sidx)*/
// Get a bit slice writer for the init segment
// Get a byte buffer of 10008 bytes to write the init segment
buffer := bytes.NewBuffer(make([]byte, 0))
init.Encode(buffer)
// The first 10008 bytes of the file is a free box, so we can read it and replace it with the moov box.
// The init box might not be 10008 bytes, so we need to read the first 10008 bytes and then replace it with the moov box.
// while the remaining bytes are for a new free box.
// Write the init segment at the beginning of the file, replacing the free box
if _, err := mp4.FileWriter.WriteAt(buffer.Bytes(), 0); err != nil {
panic(err)
}
// Calculate the remaining size for the free box
remainingSize := mp4.FreeBoxSize - int64(buffer.Len())
if remainingSize > 0 {
newFreeBox := mp4ff.NewFreeBox(make([]byte, remainingSize))
var freeBuf bytes.Buffer
if err := newFreeBox.Encode(&freeBuf); err != nil {
panic(err)
// Build a Segment Index (sidx) box so players can seek directly to any
// fragment without scanning the entire file.
if len(mp4.SegmentDurations) > 0 {
sidx := &mp4ff.SidxBox{
Version: 1,
Flags: 0,
ReferenceID: uint32(mp4.VideoTrack),
Timescale: videoTimescale,
EarliestPresentationTime: 0,
FirstOffset: 0,
SidxRefs: make([]mp4ff.SidxRef, 0, len(mp4.SegmentDurations)),
}
if _, err := mp4.FileWriter.WriteAt(freeBuf.Bytes(), int64(buffer.Len())); err != nil {
panic(err)
for i, dur := range mp4.SegmentDurations {
sidx.SidxRefs = append(sidx.SidxRefs, mp4ff.SidxRef{
ReferenceType: 0, // media reference
ReferencedSize: uint32(mp4.MoofBoxSizes[i]),
SubSegmentDuration: uint32(dur),
StartsWithSAP: 1,
SAPType: 1,
})
}
init.AddChild(sidx)
}
}
type segData struct {
startPos uint64
presentationTime uint64
baseDecodeTime uint64
dur uint32
size uint32
}
func fillSidx(sidx *mp4ff.SidxBox, refTrak *mp4ff.TrakBox, segDatas []segData, nonZeroEPT bool) {
ept := uint64(0)
if nonZeroEPT {
ept = segDatas[0].presentationTime
// Encode the ftyp + moov + sidx into a buffer to measure the total size.
// Then compute the correct sidx.FirstOffset (the gap between the end of
// the sidx box and the first moof, occupied by the trailing free box)
// and re-encode with the corrected value.
var initBuf bytes.Buffer
if err := init.Encode(&initBuf); err != nil {
log.Log.Error("mp4.Close(): error encoding init segment: " + err.Error())
}
sidx.Version = 1
sidx.Timescale = refTrak.Mdia.Mdhd.Timescale
sidx.ReferenceID = 1
sidx.EarliestPresentationTime = ept
sidx.FirstOffset = 0
sidx.SidxRefs = make([]mp4ff.SidxRef, 0, len(segDatas))
for _, segData := range segDatas {
size := segData.size
sidx.SidxRefs = append(sidx.SidxRefs, mp4ff.SidxRef{
ReferencedSize: size,
SubSegmentDuration: segData.dur,
StartsWithSAP: 1,
SAPType: 1,
})
}
}
initSize := int64(initBuf.Len())
// findSegmentData returns a slice of segment media data using a reference track.
func findSegmentData(segs []*mp4ff.MediaSegment, refTrak *mp4ff.TrakBox, trex *mp4ff.TrexBox) ([]segData, error) {
segDatas := make([]segData, 0, len(segs))
for _, seg := range segs {
var firstCompositionTimeOffest int64
dur := uint32(0)
var baseTime uint64
for fIdx, frag := range seg.Fragments {
for _, traf := range frag.Moof.Trafs {
tfhd := traf.Tfhd
if tfhd.TrackID == refTrak.Tkhd.TrackID { // Find track that gives sidx time values
if fIdx == 0 {
baseTime = traf.Tfdt.BaseMediaDecodeTime()
}
for i, trun := range traf.Truns {
trun.AddSampleDefaultValues(tfhd, trex)
samples := trun.GetSamples()
for j, sample := range samples {
if fIdx == 0 && i == 0 && j == 0 {
firstCompositionTimeOffest = int64(sample.CompositionTimeOffset)
}
dur += sample.Dur
}
}
// The sidx.FirstOffset is defined as the distance (in bytes) from the
// anchor point (first byte after the sidx box) to the first byte of
// the first referenced moof/mdat. Since sidx is the last box in init,
// the anchor point is at initSize, and the first moof is at FreeBoxSize.
if len(mp4.SegmentDurations) > 0 {
if mp4.FreeBoxSize < initSize {
// Avoid computing a negative offset and wrapping it to uint64.
log.Log.Error("mp4.Close(): FreeBoxSize is smaller than initSize; skipping sidx FirstOffset adjustment")
} else {
firstOffset := uint64(mp4.FreeBoxSize - initSize)
// Find the sidx we added and update its FirstOffset
for _, child := range init.Children {
if sidxBox, ok := child.(*mp4ff.SidxBox); ok {
sidxBox.FirstOffset = firstOffset
break
}
}
// Re-encode with the corrected FirstOffset (same size, no layout change)
initBuf.Reset()
if err := init.Encode(&initBuf); err != nil {
log.Log.Error("mp4.Close(): error re-encoding init segment: " + err.Error())
}
initSize = int64(initBuf.Len())
}
sd := segData{
startPos: seg.StartPos,
presentationTime: uint64(int64(baseTime) + firstCompositionTimeOffest),
baseDecodeTime: baseTime,
dur: dur,
size: uint32(seg.Size()),
}
segDatas = append(segDatas, sd)
}
return segDatas, nil
if initSize > mp4.FreeBoxSize {
log.Log.Error(fmt.Sprintf("mp4.Close(): init segment (%d bytes) exceeds reserved space (%d bytes), file may be corrupt", initSize, mp4.FreeBoxSize))
}
// Write the init segment at the beginning of the file, overwriting the free box placeholder.
if _, err := mp4.FileWriter.WriteAt(initBuf.Bytes(), 0); err != nil {
log.Log.Error("mp4.Close(): error writing init segment: " + err.Error())
}
// Fill any remaining reserved space with a new (smaller) free box so
// the byte offsets of the moof/mdat boxes that follow are preserved.
remainingSize := mp4.FreeBoxSize - initSize
if remainingSize >= 8 { // minimum box size is 8 bytes (header only)
newFree := mp4ff.NewFreeBox(make([]byte, remainingSize-8))
var freeBuf bytes.Buffer
if err := newFree.Encode(&freeBuf); err != nil {
log.Log.Error("mp4.Close(): error encoding free box: " + err.Error())
}
if _, err := mp4.FileWriter.WriteAt(freeBuf.Bytes(), initSize); err != nil {
log.Log.Error("mp4.Close(): error writing free box: " + err.Error())
}
}
if err := mp4.FileWriter.Sync(); err != nil {
log.Log.Error("mp4.Close(): error syncing file: " + err.Error())
}
mp4.FileWriter.Close()
}
// annexBToLengthPrefixed converts Annex B formatted H264 data (with start codes)
@@ -909,7 +996,7 @@ func SampleToAACSampleIndex(sampling int) int {
return i
}
}
panic("not Found AAC Sample Index")
return -1
}
func AACSampleIdxToSample(idx int) int {

View File

@@ -1,6 +1,7 @@
package webrtc
import (
"context"
"encoding/base64"
"encoding/json"
"io"
@@ -22,13 +23,105 @@ import (
pionMedia "github.com/pion/webrtc/v4/pkg/media"
)
var (
CandidatesMutex sync.Mutex
CandidateArrays map[string](chan string)
peerConnectionCount int64
peerConnections map[string]*pionWebRTC.PeerConnection
const (
// Channel buffer sizes
candidateChannelBuffer = 100
rtcpBufferSize = 1500
// Timeouts and intervals
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
// Track identifiers
trackStreamID = "kerberos-stream"
)
// ConnectionManager manages WebRTC peer connections in a thread-safe manner
type ConnectionManager struct {
mu sync.RWMutex
candidateChannels map[string]chan string
peerConnections map[string]*peerConnectionWrapper
peerConnectionCount int64
}
// peerConnectionWrapper wraps a peer connection with additional metadata
type peerConnectionWrapper struct {
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
}
var globalConnectionManager = NewConnectionManager()
// NewConnectionManager creates a new connection manager
func NewConnectionManager() *ConnectionManager {
return &ConnectionManager{
candidateChannels: make(map[string]chan string),
peerConnections: make(map[string]*peerConnectionWrapper),
}
}
// GetOrCreateCandidateChannel gets or creates a candidate channel for a session
func (cm *ConnectionManager) GetOrCreateCandidateChannel(sessionKey string) chan string {
cm.mu.Lock()
defer cm.mu.Unlock()
if ch, exists := cm.candidateChannels[sessionKey]; exists {
return ch
}
ch := make(chan string, candidateChannelBuffer)
cm.candidateChannels[sessionKey] = ch
return ch
}
// CloseCandidateChannel safely closes and removes a candidate channel
func (cm *ConnectionManager) CloseCandidateChannel(sessionKey string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if ch, exists := cm.candidateChannels[sessionKey]; exists {
close(ch)
delete(cm.candidateChannels, sessionKey)
}
}
// AddPeerConnection adds a peer connection to the manager
func (cm *ConnectionManager) AddPeerConnection(sessionID string, wrapper *peerConnectionWrapper) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.peerConnections[sessionID] = wrapper
}
// RemovePeerConnection removes a peer connection from the manager
func (cm *ConnectionManager) RemovePeerConnection(sessionID string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if wrapper, exists := cm.peerConnections[sessionID]; exists {
if wrapper.cancelCtx != nil {
wrapper.cancelCtx()
}
delete(cm.peerConnections, sessionID)
}
}
// GetPeerConnectionCount returns the current count of active peer connections
func (cm *ConnectionManager) GetPeerConnectionCount() int64 {
return atomic.LoadInt64(&cm.peerConnectionCount)
}
// IncrementPeerCount atomically increments the peer connection count
func (cm *ConnectionManager) IncrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, 1)
}
// DecrementPeerCount atomically decrements the peer connection count
func (cm *ConnectionManager) DecrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, -1)
}
type WebRTC struct {
Name string
StunServers []string
@@ -46,7 +139,7 @@ func CreateWebRTC(name string, stunServers []string, turnServers []string, turnS
TurnServers: turnServers,
TurnServersUsername: turnServersUsername,
TurnServersCredential: turnServersCredential,
Timer: time.NewTimer(time.Second * 10),
Timer: time.NewTimer(defaultTimeout),
}
}
@@ -68,19 +161,14 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
}
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
// Set lock
CandidatesMutex.Lock()
_, ok := CandidateArrays[key]
if !ok {
CandidateArrays[key] = make(chan string, 100)
}
log.Log.Info("webrtc.main.HandleReceiveHDCandidates(): " + candidate.Candidate)
ch := globalConnectionManager.GetOrCreateCandidateChannel(key)
log.Log.Info("webrtc.main.RegisterCandidates(): " + candidate.Candidate)
select {
case CandidateArrays[key] <- candidate.Candidate:
case ch <- candidate.Candidate:
default:
log.Log.Info("webrtc.main.HandleReceiveHDCandidates(): channel is full.")
log.Log.Info("webrtc.main.RegisterCandidates(): channel is full, dropping candidate")
}
CandidatesMutex.Unlock()
}
func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, interceptorRegistry *interceptor.Registry) error {
@@ -107,12 +195,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// We create a channel which will hold the candidates for this session.
sessionKey := config.Key + "/" + handshake.SessionID
CandidatesMutex.Lock()
_, ok := CandidateArrays[sessionKey]
if !ok {
CandidateArrays[sessionKey] = make(chan string, 100)
}
CandidatesMutex.Unlock()
candidateChannel := globalConnectionManager.GetOrCreateCandidateChannel(sessionKey)
// Set variables
hubKey := handshake.HubKey
@@ -178,81 +261,128 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
if err == nil && peerConnection != nil {
var videoSender *pionWebRTC.RTPSender = nil
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while adding video track: " + err.Error())
// Create context for this connection
ctx, cancel := context.WithCancel(context.Background())
wrapper := &peerConnectionWrapper{
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
}
var videoSender *pionWebRTC.RTPSender = nil
if videoTrack != nil {
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
cancel()
return
}
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): video track is nil, skipping video")
}
// Read incoming RTCP packets
// Before these packets are returned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := videoSender.Read(rtcpBuf); rtcpErr != nil {
return
if videoSender != nil {
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): video RTCP reader stopped")
}()
rtcpBuf := make([]byte, rtcpBufferSize)
for {
select {
case <-ctx.Done():
return
default:
if _, _, rtcpErr := videoSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}
}
}()
}()
}
var audioSender *pionWebRTC.RTPSender = nil
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while adding audio track: " + err.Error())
} // Read incoming RTCP packets
if audioTrack != nil {
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
cancel()
return
}
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): audio track is nil, skipping audio")
}
// Read incoming RTCP packets
// Before these packets are returned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := audioSender.Read(rtcpBuf); rtcpErr != nil {
return
if audioSender != nil {
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): audio RTCP reader stopped")
}()
rtcpBuf := make([]byte, rtcpBufferSize)
for {
select {
case <-ctx.Done():
return
default:
if _, _, rtcpErr := audioSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}
}
}()
}()
}
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
if connectionState == pionWebRTC.PeerConnectionStateDisconnected || connectionState == pionWebRTC.PeerConnectionStateClosed {
// Set lock
CandidatesMutex.Lock()
atomic.AddInt64(&peerConnectionCount, -1)
_, ok := CandidateArrays[sessionKey]
if ok {
close(CandidateArrays[sessionKey])
delete(CandidateArrays, sessionKey)
}
// Not really needed.
//senders := peerConnection.GetSenders()
//for _, sender := range senders {
// if err := peerConnection.RemoveTrack(sender); err != nil {
// log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while removing track: " + err.Error())
// }
//}
if err := peerConnection.Close(); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while closing peer connection: " + err.Error())
}
peerConnections[handshake.SessionID] = nil
delete(peerConnections, handshake.SessionID)
CandidatesMutex.Unlock()
} else if connectionState == pionWebRTC.PeerConnectionStateConnected {
CandidatesMutex.Lock()
atomic.AddInt64(&peerConnectionCount, 1)
CandidatesMutex.Unlock()
} else if connectionState == pionWebRTC.PeerConnectionStateFailed {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICEConnectionStateFailed")
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Number of peers connected (" + strconv.FormatInt(peerConnectionCount, 10) + ")")
switch connectionState {
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed:
wrapper.closeOnce.Do(func() {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer disconnected. Active peers: " + string(rune(count)))
// Clean up resources
globalConnectionManager.CloseCandidateChannel(sessionKey)
if err := peerConnection.Close(); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error closing peer connection: " + err.Error())
}
globalConnectionManager.RemovePeerConnection(handshake.SessionID)
close(wrapper.done)
})
case pionWebRTC.PeerConnectionStateConnected:
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + string(rune(count)))
case pionWebRTC.PeerConnectionStateFailed:
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection failed")
}
})
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID)
}()
// Iterate over the candidates and send them to the remote client
// Non blocking channe
for candidate := range CandidateArrays[sessionKey] {
CandidatesMutex.Lock()
log.Log.Info(">>>> webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while adding candidate: " + candidateErr.Error())
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: candidate}); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
CandidatesMutex.Unlock()
}
}()
@@ -270,22 +400,56 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate
var hasRelayCandidates bool
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
if candidate == nil {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE gathering complete (candidate is nil)")
if !hasRelayCandidates {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): WARNING - No TURN (relay) candidates were gathered! TURN servers: " +
config.TURNURI + ", Username: " + config.TURNUsername + ", ForceTurn: " + config.ForceTurn)
}
return
}
// Log candidate details for debugging
candidateJSON := candidate.ToJSON()
candidateStr := candidateJSON.Candidate
// Determine candidate type from the candidate string
candidateType := "unknown"
if candidateJSON.Candidate != "" {
switch candidate.Typ {
case pionWebRTC.ICECandidateTypeRelay:
candidateType = "relay"
case pionWebRTC.ICECandidateTypeSrflx:
candidateType = "srflx"
case pionWebRTC.ICECandidateTypeHost:
candidateType = "host"
case pionWebRTC.ICECandidateTypePrflx:
candidateType = "prflx"
}
}
// Track if we received any relay (TURN) candidates
if candidateType == "relay" {
hasRelayCandidates = true
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE candidate received - Type: " + candidateType +
", Candidate: " + candidateStr)
// Create a config map
valueMap := make(map[string]interface{})
candateJSON := candidate.ToJSON()
candateBinary, err := json.Marshal(candateJSON)
candateBinary, err := json.Marshal(candidateJSON)
if err == nil {
valueMap["candidate"] = string(candateBinary)
// SDP is not needed to be send..
//valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): something went wrong while marshalling candidate: " + err.Error())
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to marshal candidate: " + err.Error())
}
// We'll send the candidate to the hub
@@ -305,8 +469,8 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
})
// Create a channel which will be used to send candidates to the other peer
peerConnections[handshake.SessionID] = peerConnection
// Store peer connection in manager
globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper)
if err == nil {
// Create a config map
@@ -339,7 +503,11 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
mimeType := pionWebRTC.MimeTypeH264
outboundVideoTrack, _ := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", "pion124")
outboundVideoTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", trackStreamID)
if err != nil {
log.Log.Error("webrtc.main.NewVideoTrack(): error creating video track: " + err.Error())
return nil
}
return outboundVideoTrack
}
@@ -354,161 +522,245 @@ func NewAudioTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample
mimeType = pionWebRTC.MimeTypePCMA
}
}
outboundAudioTrack, _ := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", "pion124")
if mimeType == "" {
log.Log.Error("webrtc.main.NewAudioTrack(): no supported audio codec found")
return nil
}
outboundAudioTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", trackStreamID)
if err != nil {
log.Log.Error("webrtc.main.NewAudioTrack(): error creating audio track: " + err.Error())
return nil
}
return outboundAudioTrack
}
// streamState holds state information for the streaming process
type streamState struct {
lastKeepAlive int64
peerCount int64
start bool
receivedKeyFrame bool
lastAudioSample *pionMedia.Sample
lastVideoSample *pionMedia.Sample
}
// codecSupport tracks which codecs are available in the stream
type codecSupport struct {
hasH264 bool
hasPCM_MULAW bool
hasAAC bool
hasOpus bool
}
// detectCodecs examines the stream to determine which codecs are available
func detectCodecs(rtspClient capture.RTSPClient) codecSupport {
support := codecSupport{}
streams, _ := rtspClient.GetStreams()
for _, stream := range streams {
switch stream.Name {
case "H264":
support.hasH264 = true
case "PCM_MULAW":
support.hasPCM_MULAW = true
case "AAC":
support.hasAAC = true
case "OPUS":
support.hasOpus = true
}
}
return support
}
// hasValidCodecs checks if at least one valid video or audio codec is present
func (cs codecSupport) hasValidCodecs() bool {
hasVideo := cs.hasH264
hasAudio := cs.hasPCM_MULAW || cs.hasAAC || cs.hasOpus
return hasVideo || hasAudio
}
// shouldContinueStreaming determines if streaming should continue based on keepalive and peer count
func shouldContinueStreaming(config models.Config, state *streamState) bool {
if config.Capture.ForwardWebRTC != "true" {
return true
}
now := time.Now().Unix()
hasTimedOut := (now - state.lastKeepAlive) > int64(keepAliveTimeout.Seconds())
hasNoPeers := state.peerCount == 0
return !hasTimedOut && !hasNoPeers
}
// updateStreamState updates keepalive and peer count from communication channels
func updateStreamState(communication *models.Communication, state *streamState) {
select {
case keepAliveStr := <-communication.HandleLiveHDKeepalive:
if val, err := strconv.ParseInt(keepAliveStr, 10, 64); err == nil {
state.lastKeepAlive = val
}
default:
}
select {
case peerCountStr := <-communication.HandleLiveHDPeers:
if val, err := strconv.ParseInt(peerCountStr, 10, 64); err == nil {
state.peerCount = val
}
default:
}
}
// writeFinalSamples writes any remaining buffered samples
func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.TrackLocalStaticSample) {
if state.lastVideoSample != nil && videoTrack != nil {
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final video sample: " + err.Error())
}
}
if state.lastAudioSample != nil && audioTrack != nil {
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final audio sample: " + err.Error())
}
}
}
// processVideoPacket processes a video packet and writes samples to the track
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
if videoTrack == nil {
return
}
// Start at the first keyframe
if pkt.IsKeyFrame {
state.start = true
}
if !state.start {
return
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
if config.Capture.ForwardWebRTC == "true" {
// Remote forwarding not yet implemented
log.Log.Debug("webrtc.main.processVideoPacket(): remote forwarding not implemented")
return
}
if state.lastVideoSample != nil {
duration := sample.PacketTimestamp - state.lastVideoSample.PacketTimestamp
state.lastVideoSample.Duration = time.Duration(duration) * time.Millisecond
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
}
}
state.lastVideoSample = &sample
}
// processAudioPacket processes an audio packet and writes samples to the track
func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pionWebRTC.TrackLocalStaticSample, hasAAC bool) {
if audioTrack == nil {
return
}
if hasAAC {
// AAC transcoding not yet implemented
// TODO: Implement AAC to PCM_MULAW transcoding
return
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
if state.lastAudioSample != nil {
duration := sample.PacketTimestamp - state.lastAudioSample.PacketTimestamp
state.lastAudioSample.Duration = time.Duration(duration) * time.Millisecond
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
}
}
state.lastAudioSample = &sample
}
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
config := configuration.Config
// Make peerconnection map
peerConnections = make(map[string]*pionWebRTC.PeerConnection)
// Set the indexes for the video & audio streams
// Later when we read a packet we need to figure out which track to send it to.
hasH264 := false
hasPCM_MULAW := false
hasAAC := false
hasOpus := false
streams, _ := rtspClient.GetStreams()
for _, stream := range streams {
if stream.Name == "H264" {
hasH264 = true
} else if stream.Name == "PCM_MULAW" {
hasPCM_MULAW = true
} else if stream.Name == "AAC" {
hasAAC = true
} else if stream.Name == "OPUS" {
hasOpus = true
}
// Check if at least one track is available
if videoTrack == nil && audioTrack == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio tracks are nil, cannot proceed")
return
}
if !hasH264 && !hasPCM_MULAW && !hasAAC && !hasOpus {
log.Log.Error("webrtc.main.WriteToTrack(): no valid video codec and audio codec found.")
} else {
if config.Capture.TranscodingWebRTC == "true" {
// Todo..
} else {
//log.Log.Info("webrtc.main.WriteToTrack(): not using a transcoder.")
}
// Detect available codecs
codecs := detectCodecs(rtspClient)
var cursorError error
var pkt packets.Packet
var lastAudioSample *pionMedia.Sample = nil
var lastVideoSample *pionMedia.Sample = nil
start := false
receivedKeyFrame := false
lastKeepAlive := "0"
peerCount := "0"
for cursorError == nil {
pkt, cursorError = livestreamCursor.ReadPacket()
//if config.Capture.ForwardWebRTC != "true" && peerConnectionCount == 0 {
// start = false
// receivedKeyFrame = false
// continue
//}
select {
case lastKeepAlive = <-communication.HandleLiveHDKeepalive:
default:
}
select {
case peerCount = <-communication.HandleLiveHDPeers:
default:
}
now := time.Now().Unix()
lastKeepAliveN, _ := strconv.ParseInt(lastKeepAlive, 10, 64)
hasTimedOut := (now - lastKeepAliveN) > 15 // if longer then no response in 15 sec.
hasNoPeers := peerCount == "0"
if config.Capture.ForwardWebRTC == "true" && (hasTimedOut || hasNoPeers) {
start = false
receivedKeyFrame = false
continue
}
if len(pkt.Data) == 0 || pkt.Data == nil {
receivedKeyFrame = false
continue
}
if !receivedKeyFrame {
if pkt.IsKeyFrame {
receivedKeyFrame = true
} else {
continue
}
}
//if config.Capture.TranscodingWebRTC == "true" {
// We will transcode the video
// TODO..
//}
if pkt.IsVideo {
// Start at the first keyframe
if pkt.IsKeyFrame {
start = true
}
if start {
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
//sample = pionMedia.Sample{Data: pkt.Data, Duration: time.Second}
if config.Capture.ForwardWebRTC == "true" {
// We will send the video to a remote peer
// TODO..
} else {
if lastVideoSample != nil {
duration := sample.PacketTimestamp - lastVideoSample.PacketTimestamp
bufferDurationCasted := time.Duration(duration) * time.Millisecond
lastVideoSample.Duration = bufferDurationCasted
if err := videoTrack.WriteSample(*lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error())
}
}
lastVideoSample = &sample
}
}
} else if pkt.IsAudio {
// @TODO: We need to check if the audio is PCM_MULAW or AAC
// If AAC we need to transcode it to PCM_MULAW
// If PCM_MULAW we can send it directly.
if hasAAC {
// We will transcode the audio from AAC to PCM_MULAW
// Not sure how to do this yet, but we need to use a decoder
// and then encode it to PCM_MULAW.
// TODO..
//d := fdkaac.NewAacDecoder()
continue
}
// We will send the audio
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
if lastAudioSample != nil {
duration := sample.PacketTimestamp - lastAudioSample.PacketTimestamp
bufferDurationCasted := time.Duration(duration) * time.Millisecond
lastAudioSample.Duration = bufferDurationCasted
if err := audioTrack.WriteSample(*lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error())
}
}
lastAudioSample = &sample
}
}
if !codecs.hasValidCodecs() {
log.Log.Error("webrtc.main.WriteToTrack(): no valid video or audio codec found")
return
}
peerConnectionCount = 0
log.Log.Info("webrtc.main.WriteToTrack(): stop writing to track.")
if config.Capture.TranscodingWebRTC == "true" {
log.Log.Info("webrtc.main.WriteToTrack(): transcoding enabled but not yet implemented")
}
// Initialize streaming state
state := &streamState{
lastKeepAlive: time.Now().Unix(),
peerCount: 0,
}
defer func() {
writeFinalSamples(state, videoTrack, audioTrack)
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
}()
var pkt packets.Packet
var cursorError error
for cursorError == nil {
pkt, cursorError = livestreamCursor.ReadPacket()
if cursorError != nil {
break
}
// Update state from communication channels
updateStreamState(communication, state)
// Check if we should continue streaming
if !shouldContinueStreaming(config, state) {
state.start = false
state.receivedKeyFrame = false
continue
}
// Skip empty packets
if len(pkt.Data) == 0 || pkt.Data == nil {
state.receivedKeyFrame = false
continue
}
// Wait for first keyframe before processing
if !state.receivedKeyFrame {
if pkt.IsKeyFrame {
state.receivedKeyFrame = true
} else {
continue
}
}
// Process video or audio packets
if pkt.IsVideo {
processVideoPacket(pkt, state, videoTrack, config)
} else if pkt.IsAudio {
processAudioPacket(pkt, state, audioTrack, codecs.hasAAC)
}
}
}

View File

@@ -1715,10 +1715,10 @@
"@jridgewell/resolve-uri" "^3.0.3"
"@jridgewell/sourcemap-codec" "^1.4.10"
"@kerberos-io/ui@^1.71.0":
version "1.71.0"
resolved "https://registry.yarnpkg.com/@kerberos-io/ui/-/ui-1.71.0.tgz#06914c94e8b0982068d2099acf8158917a511bfc"
integrity sha512-pHCTn/iQTcQEPoCK82eJHGRn6BgzW3wgV4C+mNqdKOtLTquxL+vh7molEgC66tl3DGf7HyjSNa8LuoxYbt9TEg==
"@kerberos-io/ui@^1.76.0":
version "1.77.0"
resolved "https://registry.yarnpkg.com/@kerberos-io/ui/-/ui-1.77.0.tgz#b748b2a9abf793ff2a9ba64ee41f84debc0ca9dc"
integrity sha512-CHh4jeLKwrYvJRL5PM3UEN4p2k1fqwMKgSF2U6IR4v0fE2FwPc/2Ry4zGk6pvLDFHbDpR9jUkHX+iNphvStoyQ==
dependencies:
"@emotion/react" "^11.10.4"
"@emotion/styled" "^11.10.4"