Compare commits

...

15 Commits

Author SHA1 Message Date
Cédric Verstraeten
4fbee60e9f Merge pull request #261 from kerberos-io/feature/add-webrtc-aac-transcoder
feature/add-webrtc-aac-transcoder
2026-03-09 17:46:17 +01:00
Cédric Verstraeten
d6c25df280 Add missing imports for strconv and strings in AAC transcoder stub 2026-03-09 16:42:42 +00:00
Cédric Verstraeten
72a2d28e1e Update aac_transcoder_stub.go 2026-03-09 17:41:54 +01:00
Cédric Verstraeten
eb0972084f Implement AAC transcoding for WebRTC using FFmpeg; update Dockerfiles and launch configuration 2026-03-09 16:34:52 +00:00
Cédric Verstraeten
41a1d221fc Merge pull request #260 from kerberos-io/fix/set-clean-state
fix/set-clean-state
2026-03-09 16:56:36 +01:00
Cédric Verstraeten
eaacc93d2f Set MQTT clean session to true and disable resume subscriptions 2026-03-09 15:50:40 +00:00
Cédric Verstraeten
0e6a004c23 Merge pull request #259 from kerberos-io/fix/add-grace-period
feature/add-broadcasting-feature
2026-03-09 16:20:39 +01:00
Cédric Verstraeten
617f854534 Merge branch 'master' into fix/add-grace-period 2026-03-09 16:17:35 +01:00
Cédric Verstraeten
1bf8006055 Refactor WebRTC handling to use per-peer broadcasters for video and audio tracks 2026-03-09 15:12:01 +00:00
Cédric Verstraeten
ca0e426382 Add max signaling age constant and discard stale WebRTC messages 2026-03-09 14:50:00 +00:00
Cédric Verstraeten
726d0722d9 Merge pull request #258 from kerberos-io/fix/add-grace-period
fix/add-grace-period
2026-03-09 15:20:53 +01:00
Cédric Verstraeten
d8f320b040 Add disconnect grace period handling in WebRTC connection manager 2026-03-09 14:15:50 +00:00
Cédric Verstraeten
0131b87692 Merge pull request #257 from kerberos-io/security/middleware-exposure
security/middleware-exposure
2026-03-09 14:18:11 +01:00
Cédric Verstraeten
54e8198b65 Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-09 14:18:00 +01:00
Cédric Verstraeten
3bfb68f950 Update port configuration and secure routes with JWT authentication middleware 2026-03-09 12:42:05 +00:00
9 changed files with 956 additions and 144 deletions

View File

@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

View File

@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

View File

@@ -800,17 +800,19 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
// Should create a track here.
// Create per-peer broadcasters instead of shared tracks.
// Each viewer gets its own track with independent, non-blocking writes
// so a slow/congested peer cannot stall the others.
streams, _ := rtspClient.GetStreams()
videoTrack := webrtc.NewVideoTrack(streams)
audioTrack := webrtc.NewAudioTrack(streams)
videoBroadcaster := webrtc.NewVideoBroadcaster(streams)
audioBroadcaster := webrtc.NewAudioBroadcaster(streams)
if videoTrack == nil && audioTrack == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio tracks")
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio broadcasters")
return
}
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoTrack, audioTrack, rtspClient)
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, rtspClient)
if config.Capture.ForwardWebRTC == "true" {
@@ -818,7 +820,7 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
log.Log.Info("cloud.HandleLiveStreamHD(): Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("cloud.HandleLiveStreamHD(): setting up a peer connection.")
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, handshake)
}
}

View File

@@ -20,96 +20,93 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.GET("/config", func(c *gin.Context) {
r.GET("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.POST("/config", func(c *gin.Context) {
r.POST("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
api := r.Group("/api")
{
// Public endpoints (no authentication required)
api.POST("/login", authMiddleware.LoginHandler)
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods. Doesn't require any authorization.
// These are available for anyone, but require the agent, to reach
// the camera.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods. Doesn't require any authorization.
// Will verify the current onvif settings.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
// Secured endpoints..
// Apply JWT authentication middleware.
// All routes registered below this line require a valid JWT token.
api.Use(authMiddleware.MiddlewareFunc())
{
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
}
}
return api

View File

@@ -90,9 +90,9 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
// Some extra options to make sure the connection behaves
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
opts.SetCleanSession(false)
opts.SetResumeSubs(true)
opts.SetStore(mqtt.NewMemoryStore())
opts.SetCleanSession(true)
//opts.SetResumeSubs(true)
//opts.SetStore(mqtt.NewMemoryStore())
opts.SetConnectRetry(true)
opts.SetAutoReconnect(true)
opts.SetConnectRetryInterval(5 * time.Second)
@@ -169,6 +169,12 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
return nil
}
// maxSignalingAge is the maximum age of a WebRTC signaling message (request-hd-stream,
// receive-hd-candidates) before it is considered stale and discarded. With CleanSession=false
// the MQTT broker may replay queued messages from previous sessions; this prevents the agent
// from setting up peer connections for viewers that are no longer waiting.
const maxSignalingAge = 30 * time.Second
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory string, configuration *models.Configuration, communication *models.Communication) {
if hubKey == "" {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}")
@@ -274,6 +280,18 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
// We'll find out which message we received, and act accordingly.
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): received message with action: " + payload.Action)
// For time-sensitive WebRTC signaling messages, discard stale ones that may
// have been queued by the broker while CleanSession=false.
if payload.Action == "request-hd-stream" || payload.Action == "receive-hd-candidates" {
messageAge := time.Since(time.Unix(message.Timestamp, 0))
if messageAge > maxSignalingAge {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): discarding stale " + payload.Action +
" message (age: " + messageAge.Round(time.Second).String() + ")")
return
}
}
switch payload.Action {
case "record":
go HandleRecording(mqttClient, hubKey, payload, configuration, communication)

View File

@@ -0,0 +1,270 @@
// AAC to G.711 µ-law transcoder using FFmpeg (libavcodec + libswresample).
// Build with: go build -tags ffmpeg ...
//
// Requires: libavcodec-dev, libavutil-dev, libswresample-dev (FFmpeg ≥ 5.x)
// and an AAC decoder compiled into the FFmpeg build (usually the default).
//
//go:build ffmpeg
package webrtc
/*
#cgo pkg-config: libavcodec libavutil libswresample
#cgo CFLAGS: -Wno-deprecated-declarations
#include <libavcodec/avcodec.h>
#include <libavutil/channel_layout.h>
#include <libavutil/frame.h>
#include <libavutil/mem.h>
#include <libavutil/opt.h>
#include <libswresample/swresample.h>
#include <stdlib.h>
#include <string.h>
// ── Transcoder handle ───────────────────────────────────────────────────
typedef struct {
AVCodecContext *codec_ctx;
AVCodecParserContext *parser;
SwrContext *swr_ctx;
AVFrame *frame;
AVPacket *pkt;
int swr_initialized;
int in_sample_rate;
int in_channels;
} aac_transcoder_t;
// ── Create / Destroy ────────────────────────────────────────────────────
static aac_transcoder_t* aac_transcoder_create(void) {
const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC);
if (!codec) return NULL;
aac_transcoder_t *t = (aac_transcoder_t*)calloc(1, sizeof(aac_transcoder_t));
if (!t) return NULL;
t->codec_ctx = avcodec_alloc_context3(codec);
if (!t->codec_ctx) { free(t); return NULL; }
if (avcodec_open2(t->codec_ctx, codec, NULL) < 0) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->parser = av_parser_init(AV_CODEC_ID_AAC);
if (!t->parser) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->frame = av_frame_alloc();
t->pkt = av_packet_alloc();
if (!t->frame || !t->pkt) {
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
av_parser_close(t->parser);
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
return t;
}
static void aac_transcoder_destroy(aac_transcoder_t *t) {
if (!t) return;
if (t->swr_ctx) swr_free(&t->swr_ctx);
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
if (t->parser) av_parser_close(t->parser);
if (t->codec_ctx) avcodec_free_context(&t->codec_ctx);
free(t);
}
// ── Lazy resampler init (called after the first decoded frame) ──────────
static int aac_init_swr(aac_transcoder_t *t) {
int64_t in_ch_layout = (int64_t)t->codec_ctx->channel_layout;
if (in_ch_layout == 0)
in_ch_layout = av_get_default_channel_layout(t->codec_ctx->channels);
t->swr_ctx = swr_alloc_set_opts(
NULL,
AV_CH_LAYOUT_MONO, // out: mono
AV_SAMPLE_FMT_S16, // out: signed 16-bit
8000, // out: 8 kHz
in_ch_layout, // in: from decoder
t->codec_ctx->sample_fmt, // in: from decoder
t->codec_ctx->sample_rate, // in: from decoder
0, NULL);
if (!t->swr_ctx) return -1;
if (swr_init(t->swr_ctx) < 0) {
swr_free(&t->swr_ctx);
return -1;
}
t->in_sample_rate = t->codec_ctx->sample_rate;
t->in_channels = t->codec_ctx->channels;
t->swr_initialized = 1;
return 0;
}
// ── Transcode ADTS → 8 kHz mono S16 PCM ────────────────────────────────
// Caller must free *out_pcm with av_free() when non-NULL.
static int aac_transcode_to_pcm(aac_transcoder_t *t,
const uint8_t *data, int data_size,
uint8_t **out_pcm, int *out_size) {
*out_pcm = NULL;
*out_size = 0;
if (!data || data_size <= 0) return 0;
int buf_cap = 8192;
uint8_t *buf = (uint8_t*)av_malloc(buf_cap);
if (!buf) return -1;
int buf_len = 0;
while (data_size > 0) {
uint8_t *pout = NULL;
int pout_size = 0;
int used = av_parser_parse2(t->parser, t->codec_ctx,
&pout, &pout_size,
data, data_size,
AV_NOPTS_VALUE, AV_NOPTS_VALUE, 0);
if (used < 0) break;
data += used;
data_size -= used;
if (pout_size == 0) continue;
// Feed parsed frame to decoder
t->pkt->data = pout;
t->pkt->size = pout_size;
if (avcodec_send_packet(t->codec_ctx, t->pkt) < 0) continue;
// Pull all decoded frames
while (avcodec_receive_frame(t->codec_ctx, t->frame) == 0) {
if (!t->swr_initialized) {
if (aac_init_swr(t) < 0) {
av_frame_unref(t->frame);
av_free(buf);
return -1;
}
}
int out_samples = swr_get_out_samples(t->swr_ctx,
t->frame->nb_samples);
if (out_samples <= 0) out_samples = t->frame->nb_samples;
int needed = buf_len + out_samples * 2; // S16 = 2 bytes/sample
if (needed > buf_cap) {
buf_cap = needed * 2;
uint8_t *tmp = (uint8_t*)av_realloc(buf, buf_cap);
if (!tmp) { av_frame_unref(t->frame); av_free(buf); return -1; }
buf = tmp;
}
uint8_t *dst = buf + buf_len;
int converted = swr_convert(t->swr_ctx,
&dst, out_samples,
(const uint8_t**)t->frame->extended_data,
t->frame->nb_samples);
if (converted > 0)
buf_len += converted * 2;
av_frame_unref(t->frame);
}
}
if (buf_len == 0) {
av_free(buf);
return 0;
}
*out_pcm = buf;
*out_size = buf_len;
return 0;
}
*/
import "C"
import (
"errors"
"fmt"
"unsafe"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/zaf/g711"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is compiled in (requires the "ffmpeg" build tag).
func AACTranscodingAvailable() bool { return true }
// AACTranscoder decodes ADTS-wrapped AAC audio to 8 kHz mono PCM
// and encodes it as G.711 µ-law for WebRTC transport.
type AACTranscoder struct {
handle *C.aac_transcoder_t
}
// NewAACTranscoder creates a transcoder backed by FFmpeg's AAC decoder.
func NewAACTranscoder() (*AACTranscoder, error) {
h := C.aac_transcoder_create()
if h == nil {
return nil, errors.New("failed to create AAC transcoder (FFmpeg AAC decoder not available?)")
}
log.Log.Info("webrtc.aac_transcoder: AAC → G.711 µ-law transcoder initialised (FFmpeg)")
return &AACTranscoder{handle: h}, nil
}
// Transcode converts an ADTS buffer (one or more AAC frames) into
// G.711 µ-law encoded audio suitable for a PCMU WebRTC track.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || t.handle == nil || len(adtsData) == 0 {
return nil, nil
}
var outPCM *C.uint8_t
var outSize C.int
ret := C.aac_transcode_to_pcm(
t.handle,
(*C.uint8_t)(unsafe.Pointer(&adtsData[0])),
C.int(len(adtsData)),
&outPCM, &outSize,
)
if ret < 0 {
return nil, errors.New("AAC decode/resample failed")
}
if outSize == 0 || outPCM == nil {
return nil, nil // decoder buffering, no output yet
}
defer C.av_free(unsafe.Pointer(outPCM))
// Copy S16LE PCM to Go slice, then encode to µ-law.
pcm := C.GoBytes(unsafe.Pointer(outPCM), outSize)
ulaw := g711.EncodeUlaw(pcm)
// Log resampler details once.
if t.handle.swr_initialized == 1 && t.handle.in_sample_rate != 0 {
log.Log.Info(fmt.Sprintf(
"webrtc.aac_transcoder: first output resampling %d Hz / %d ch → 8000 Hz mono → µ-law",
int(t.handle.in_sample_rate), int(t.handle.in_channels)))
// Prevent repeated logging by zeroing the field we check.
t.handle.in_sample_rate = 0
}
return ulaw, nil
}
// Close releases all FFmpeg resources held by the transcoder.
func (t *AACTranscoder) Close() {
if t != nil && t.handle != nil {
C.aac_transcoder_destroy(t.handle)
t.handle = nil
log.Log.Info("webrtc.aac_transcoder: transcoder closed")
}
}

View File

@@ -0,0 +1,205 @@
// AAC transcoding fallback that uses the ffmpeg binary at runtime.
// Build with -tags ffmpeg to use the in-process CGO implementation instead.
//
//go:build !ffmpeg
package webrtc
import (
"bytes"
"errors"
"io"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/kerberos-io/agent/machinery/src/log"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is available in the current runtime.
func AACTranscodingAvailable() bool {
_, err := exec.LookPath("ffmpeg")
return err == nil
}
// AACTranscoder uses an ffmpeg subprocess to convert ADTS AAC to raw PCMU.
type AACTranscoder struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
stderrBuf bytes.Buffer
mu sync.Mutex
outMu sync.Mutex
outBuf bytes.Buffer
closed bool
closeOnce sync.Once
}
// NewAACTranscoder creates a runtime ffmpeg-based transcoder.
func NewAACTranscoder() (*AACTranscoder, error) {
ffmpegPath, err := exec.LookPath("ffmpeg")
if err != nil {
return nil, errors.New("AAC transcoding not available: ffmpeg binary not found in PATH")
}
log.Log.Info("webrtc.aac_transcoder: using ffmpeg binary at " + ffmpegPath)
cmd := exec.Command(
ffmpegPath,
"-hide_banner",
"-loglevel", "error",
"-fflags", "+nobuffer",
"-flags", "low_delay",
"-f", "aac",
"-i", "pipe:0",
"-vn",
"-ac", "1",
"-ar", "8000",
"-acodec", "pcm_mulaw",
"-f", "mulaw",
"pipe:1",
)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmd.Stderr = &bytes.Buffer{}
if err := cmd.Start(); err != nil {
return nil, err
}
t := &AACTranscoder{
cmd: cmd,
stdin: stdin,
stdout: stdout,
}
if stderrBuf, ok := cmd.Stderr.(*bytes.Buffer); ok {
t.stderrBuf = *stderrBuf
}
go func() {
buf := make([]byte, 4096)
for {
n, readErr := stdout.Read(buf)
if n > 0 {
t.outMu.Lock()
_, _ = t.outBuf.Write(buf[:n])
buffered := t.outBuf.Len()
t.outMu.Unlock()
if buffered <= 8192 || buffered%16000 == 0 {
log.Log.Info("webrtc.aac_transcoder: ffmpeg produced PCMU bytes, buffered=" + strconv.Itoa(buffered))
}
}
if readErr != nil {
if readErr != io.EOF {
log.Log.Warning("webrtc.aac_transcoder: stdout reader stopped: " + readErr.Error())
}
return
}
}
}()
log.Log.Info("webrtc.aac_transcoder: AAC → PCMU transcoder initialised (ffmpeg process)")
return t, nil
}
// Transcode writes ADTS AAC to ffmpeg and returns any PCMU bytes produced.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || len(adtsData) == 0 {
return nil, nil
}
t.mu.Lock()
defer t.mu.Unlock()
if t.closed {
return nil, errors.New("AAC transcoder is closed")
}
if _, err := t.stdin.Write(adtsData); err != nil {
return nil, err
}
if len(adtsData) <= 512 || len(adtsData)%1024 == 0 {
log.Log.Info("webrtc.aac_transcoder: wrote AAC bytes to ffmpeg, input=" + strconv.Itoa(len(adtsData)))
}
deadline := time.Now().Add(75 * time.Millisecond)
for {
data := t.readAvailable()
if len(data) > 0 {
log.Log.Info("webrtc.aac_transcoder: returning PCMU bytes=" + strconv.Itoa(len(data)))
return data, nil
}
if time.Now().After(deadline) {
if stderr := t.stderrString(); stderr != "" {
log.Log.Warning("webrtc.aac_transcoder: no output before deadline, ffmpeg stderr: " + stderr)
} else {
log.Log.Info("webrtc.aac_transcoder: no PCMU output before deadline")
}
return nil, nil
}
time.Sleep(5 * time.Millisecond)
}
}
func (t *AACTranscoder) readAvailable() []byte {
t.outMu.Lock()
defer t.outMu.Unlock()
if t.outBuf.Len() == 0 {
return nil
}
out := make([]byte, t.outBuf.Len())
copy(out, t.outBuf.Bytes())
t.outBuf.Reset()
return out
}
func (t *AACTranscoder) stderrString() string {
if t == nil {
return ""
}
if stderrBuf, ok := t.cmd.Stderr.(*bytes.Buffer); ok {
return strings.TrimSpace(stderrBuf.String())
}
return strings.TrimSpace(t.stderrBuf.String())
}
// Close stops the ffmpeg subprocess.
func (t *AACTranscoder) Close() {
if t == nil {
return
}
t.closeOnce.Do(func() {
t.mu.Lock()
t.closed = true
if t.stdin != nil {
_ = t.stdin.Close()
}
t.mu.Unlock()
if t.stdout != nil {
_ = t.stdout.Close()
}
if t.cmd != nil {
_ = t.cmd.Process.Kill()
_, _ = t.cmd.Process.Wait()
if stderr := t.stderrString(); stderr != "" {
log.Log.Info("webrtc.aac_transcoder: ffmpeg stderr on close: " + stderr)
}
}
})
}

View File

@@ -0,0 +1,137 @@
package webrtc
import (
"io"
"sync"
"github.com/kerberos-io/agent/machinery/src/log"
pionWebRTC "github.com/pion/webrtc/v4"
pionMedia "github.com/pion/webrtc/v4/pkg/media"
)
const (
// peerSampleBuffer controls how many samples can be buffered per peer before
// dropping. Keeps slow peers from blocking the broadcaster.
peerSampleBuffer = 60
)
// peerTrack is a per-peer track with its own non-blocking sample channel.
type peerTrack struct {
track *pionWebRTC.TrackLocalStaticSample
samples chan pionMedia.Sample
done chan struct{}
}
// TrackBroadcaster fans out media samples to multiple peer-specific tracks
// without blocking. Each peer gets its own TrackLocalStaticSample and a
// goroutine that drains samples independently, so a slow/congested peer
// cannot stall the others.
type TrackBroadcaster struct {
mu sync.RWMutex
peers map[string]*peerTrack
mimeType string
id string
streamID string
}
// NewTrackBroadcaster creates a new broadcaster for either video or audio.
func NewTrackBroadcaster(mimeType string, id string, streamID string) *TrackBroadcaster {
return &TrackBroadcaster{
peers: make(map[string]*peerTrack),
mimeType: mimeType,
id: id,
streamID: streamID,
}
}
// AddPeer creates a new per-peer track and starts a writer goroutine.
// Returns the track to be added to the PeerConnection via AddTrack().
func (b *TrackBroadcaster) AddPeer(sessionKey string) (*pionWebRTC.TrackLocalStaticSample, error) {
track, err := pionWebRTC.NewTrackLocalStaticSample(
pionWebRTC.RTPCodecCapability{MimeType: b.mimeType},
b.id,
b.streamID,
)
if err != nil {
return nil, err
}
pt := &peerTrack{
track: track,
samples: make(chan pionMedia.Sample, peerSampleBuffer),
done: make(chan struct{}),
}
b.mu.Lock()
b.peers[sessionKey] = pt
b.mu.Unlock()
// Per-peer writer goroutine — drains samples independently.
go func() {
defer close(pt.done)
for sample := range pt.samples {
if err := pt.track.WriteSample(sample); err != nil {
if err == io.ErrClosedPipe {
return
}
log.Log.Error("webrtc.broadcaster.peerWriter(): error writing sample for " + sessionKey + ": " + err.Error())
}
}
}()
log.Log.Info("webrtc.broadcaster.AddPeer(): added peer track for " + sessionKey)
return track, nil
}
// RemovePeer stops the writer goroutine and removes the peer.
func (b *TrackBroadcaster) RemovePeer(sessionKey string) {
b.mu.Lock()
pt, exists := b.peers[sessionKey]
if exists {
delete(b.peers, sessionKey)
}
b.mu.Unlock()
if exists {
close(pt.samples)
<-pt.done // wait for writer goroutine to finish
log.Log.Info("webrtc.broadcaster.RemovePeer(): removed peer track for " + sessionKey)
}
}
// WriteSample fans out a sample to all connected peers without blocking.
// If a peer's buffer is full (slow consumer), the sample is dropped for
// that peer only — other peers are unaffected.
func (b *TrackBroadcaster) WriteSample(sample pionMedia.Sample) {
b.mu.RLock()
defer b.mu.RUnlock()
for sessionKey, pt := range b.peers {
select {
case pt.samples <- sample:
default:
log.Log.Warning("webrtc.broadcaster.WriteSample(): dropping sample for slow peer " + sessionKey)
}
}
}
// PeerCount returns the current number of connected peers.
func (b *TrackBroadcaster) PeerCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.peers)
}
// Close removes all peers and stops all writer goroutines.
func (b *TrackBroadcaster) Close() {
b.mu.Lock()
keys := make([]string, 0, len(b.peers))
for k := range b.peers {
keys = append(keys, k)
}
b.mu.Unlock()
for _, key := range keys {
b.RemovePeer(key)
}
}

View File

@@ -4,13 +4,14 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
//"github.com/izern/go-fdkaac/fdkaac"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
@@ -29,9 +30,10 @@ const (
rtcpBufferSize = 1500
// Timeouts and intervals
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
maxLivePacketAge = 1500 * time.Millisecond
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
maxLivePacketAge = 1500 * time.Millisecond
disconnectGracePeriod = 5 * time.Second
// Track identifiers
trackStreamID = "kerberos-stream"
@@ -47,11 +49,16 @@ type ConnectionManager struct {
// peerConnectionWrapper wraps a peer connection with additional metadata
type peerConnectionWrapper struct {
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
connected atomic.Bool
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
connected atomic.Bool
disconnectMu sync.Mutex
disconnectTimer *time.Timer
sessionKey string
videoBroadcaster *TrackBroadcaster
audioBroadcaster *TrackBroadcaster
}
var globalConnectionManager = NewConnectionManager()
@@ -150,6 +157,15 @@ func cleanupPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
log.Log.Info("webrtc.main.cleanupPeerConnection(): Peer disconnected. Active peers: " + strconv.FormatInt(count, 10))
}
// Remove per-peer tracks from broadcasters so the fan-out stops
// writing to this peer immediately.
if wrapper.videoBroadcaster != nil {
wrapper.videoBroadcaster.RemovePeer(sessionKey)
}
if wrapper.audioBroadcaster != nil {
wrapper.audioBroadcaster.RemovePeer(sessionKey)
}
globalConnectionManager.CloseCandidateChannel(sessionKey)
if wrapper.conn != nil {
@@ -236,7 +252,7 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
return nil
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) {
config := configuration.Config
deviceKey := config.Key
@@ -316,14 +332,25 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Create context for this connection
ctx, cancel := context.WithCancel(context.Background())
wrapper := &peerConnectionWrapper{
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
sessionKey: sessionKey,
videoBroadcaster: videoBroadcaster,
audioBroadcaster: audioBroadcaster,
}
// Create a per-peer video track from the broadcaster so writes
// to this peer are independent and non-blocking.
var videoSender *pionWebRTC.RTPSender = nil
if videoTrack != nil {
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
if videoBroadcaster != nil {
peerVideoTrack, trackErr := videoBroadcaster.AddPeer(sessionKey)
if trackErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer video track: " + trackErr.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
if videoSender, err = peerConnection.AddTrack(peerVideoTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
@@ -354,9 +381,16 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}()
}
// Create a per-peer audio track from the broadcaster.
var audioSender *pionWebRTC.RTPSender = nil
if audioTrack != nil {
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
if audioBroadcaster != nil {
peerAudioTrack, trackErr := audioBroadcaster.AddPeer(sessionKey)
if trackErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer audio track: " + trackErr.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
if audioSender, err = peerConnection.AddTrack(peerAudioTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
@@ -387,14 +421,61 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}()
}
// Log ICE connection state changes for diagnostics
peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() +
" (session: " + handshake.SessionID + ")")
})
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() +
" (session: " + handshake.SessionID + ")")
switch connectionState {
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed, pionWebRTC.PeerConnectionStateFailed:
case pionWebRTC.PeerConnectionStateDisconnected:
// Disconnected is a transient state that can recover.
// Start a grace period timer; if we don't recover, then cleanup.
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer == nil {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " +
disconnectGracePeriod.String() + " for recovery (session: " + handshake.SessionID + ")")
wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshake.SessionID + ")")
cleanupPeerConnection(sessionKey, wrapper)
})
}
wrapper.disconnectMu.Unlock()
case pionWebRTC.PeerConnectionStateFailed:
// Stop any pending disconnect timer
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
}
wrapper.disconnectMu.Unlock()
cleanupPeerConnection(sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateClosed:
// Stop any pending disconnect timer
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
}
wrapper.disconnectMu.Unlock()
cleanupPeerConnection(sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateConnected:
// Cancel any pending disconnect timer — connection recovered
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshake.SessionID + ")")
}
wrapper.disconnectMu.Unlock()
if wrapper.connected.CompareAndSwap(false, true) {
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + strconv.FormatInt(count, 10))
@@ -548,6 +629,46 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
}
func NewVideoBroadcaster(streams []packets.Stream) *TrackBroadcaster {
// Verify H264 is available (same check as NewVideoTrack)
for _, s := range streams {
if s.Name == "H264" {
return NewTrackBroadcaster(pionWebRTC.MimeTypeH264, "video", trackStreamID)
}
}
log.Log.Error("webrtc.main.NewVideoBroadcaster(): no H264 stream found")
return nil
}
func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
var audioCodecNames []string
hasAAC := false
for _, s := range streams {
if s.IsAudio {
audioCodecNames = append(audioCodecNames, s.Name)
}
switch s.Name {
case "OPUS":
return NewTrackBroadcaster(pionWebRTC.MimeTypeOpus, "audio", trackStreamID)
case "PCM_MULAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
case "PCM_ALAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMA, "audio", trackStreamID)
case "AAC":
hasAAC = true
}
}
if hasAAC {
log.Log.Info("webrtc.main.NewAudioBroadcaster(): AAC detected, creating PCMU audio track for transcoded output")
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
} else if len(audioCodecNames) > 0 {
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioBroadcaster(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
} else {
log.Log.Info("webrtc.main.NewAudioBroadcaster(): no audio stream found in camera feed")
}
return nil
}
func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
mimeType := pionWebRTC.MimeTypeH264
outboundVideoTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", trackStreamID)
@@ -560,18 +681,33 @@ func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample
func NewAudioTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
var mimeType string
var audioCodecNames []string
hasAAC := false
for _, stream := range streams {
if stream.IsAudio {
audioCodecNames = append(audioCodecNames, stream.Name)
}
if stream.Name == "OPUS" {
mimeType = pionWebRTC.MimeTypeOpus
} else if stream.Name == "PCM_MULAW" {
mimeType = pionWebRTC.MimeTypePCMU
} else if stream.Name == "PCM_ALAW" {
mimeType = pionWebRTC.MimeTypePCMA
} else if stream.Name == "AAC" {
hasAAC = true
}
}
if mimeType == "" {
log.Log.Error("webrtc.main.NewAudioTrack(): no supported audio codec found")
return nil
if hasAAC {
mimeType = pionWebRTC.MimeTypePCMU
log.Log.Info("webrtc.main.NewAudioTrack(): AAC detected, creating PCMU audio track for transcoded output")
} else if len(audioCodecNames) > 0 {
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioTrack(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
return nil
} else {
log.Log.Info("webrtc.main.NewAudioTrack(): no audio stream found in camera feed")
return nil
}
}
outboundAudioTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", trackStreamID)
if err != nil {
@@ -590,6 +726,11 @@ type streamState struct {
receivedKeyFrame bool
lastAudioSample *pionMedia.Sample
lastVideoSample *pionMedia.Sample
audioPacketsSeen int64
aacPacketsSeen int64
audioSamplesSent int64
aacNoOutput int64
aacErrors int64
}
// codecSupport tracks which codecs are available in the stream
@@ -661,17 +802,13 @@ func updateStreamState(communication *models.Communication, state *streamState)
}
// writeFinalSamples writes any remaining buffered samples
func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.TrackLocalStaticSample) {
if state.lastVideoSample != nil && videoTrack != nil {
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final video sample: " + err.Error())
}
func writeFinalSamples(state *streamState, videoBroadcaster, audioBroadcaster *TrackBroadcaster) {
if state.lastVideoSample != nil && videoBroadcaster != nil {
videoBroadcaster.WriteSample(*state.lastVideoSample)
}
if state.lastAudioSample != nil && audioTrack != nil {
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final audio sample: " + err.Error())
}
if state.lastAudioSample != nil && audioBroadcaster != nil {
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
}
@@ -710,9 +847,9 @@ func sampleDuration(current packets.Packet, previousTimestamp uint32, fallback t
return fallback
}
// processVideoPacket processes a video packet and writes samples to the track
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
if videoTrack == nil {
// processVideoPacket processes a video packet and writes samples to the broadcaster
func processVideoPacket(pkt packets.Packet, state *streamState, videoBroadcaster *TrackBroadcaster, config models.Config) {
if videoBroadcaster == nil {
return
}
@@ -735,35 +872,61 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
if state.lastVideoSample != nil {
state.lastVideoSample.Duration = sampleDuration(pkt, state.lastVideoSample.PacketTimestamp, 33*time.Millisecond)
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
}
videoBroadcaster.WriteSample(*state.lastVideoSample)
}
state.lastVideoSample = &sample
}
// processAudioPacket processes an audio packet and writes samples to the track
func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pionWebRTC.TrackLocalStaticSample, hasAAC bool) {
if audioTrack == nil {
// processAudioPacket processes an audio packet and writes samples to the broadcaster.
// When the packet carries AAC and a transcoder is provided, the audio is transcoded
// to G.711 µ-law on the fly so it can be sent over a PCMU WebRTC track.
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, transcoder *AACTranscoder) {
if audioBroadcaster == nil {
return
}
if hasAAC {
// AAC transcoding not yet implemented
// TODO: Implement AAC to PCM_MULAW transcoding
return
state.audioPacketsSeen++
audioData := pkt.Data
if pkt.Codec == "AAC" {
state.aacPacketsSeen++
if transcoder == nil {
state.aacErrors++
if state.aacErrors <= 3 || state.aacErrors%100 == 0 {
log.Log.Warning(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet dropped because transcoder is nil (aac_packets=%d, input_bytes=%d)", state.aacPacketsSeen, len(pkt.Data)))
}
return // no transcoder silently drop
}
pcmu, err := transcoder.Transcode(pkt.Data)
if err != nil {
state.aacErrors++
log.Log.Error("webrtc.main.processAudioPacket(): AAC transcode error: " + err.Error())
return
}
if len(pcmu) == 0 {
state.aacNoOutput++
if state.aacNoOutput <= 5 || state.aacNoOutput%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet produced no PCMU output yet (aac_packets=%d, no_output=%d, input_bytes=%d)", state.aacPacketsSeen, state.aacNoOutput, len(pkt.Data)))
}
return // decoder still buffering
}
if state.aacPacketsSeen <= 5 || state.aacPacketsSeen%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC transcoded to PCMU (aac_packets=%d, input_bytes=%d, output_bytes=%d, peers=%d)", state.aacPacketsSeen, len(pkt.Data), len(pcmu), audioBroadcaster.PeerCount()))
}
audioData = pcmu
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
sample := pionMedia.Sample{Data: audioData, PacketTimestamp: sampleTimestamp(pkt)}
if state.lastAudioSample != nil {
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
state.audioSamplesSent++
if state.audioSamplesSent <= 5 || state.audioSamplesSent%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): queueing audio sample (samples=%d, codec=%s, bytes=%d, duration_ms=%d, peers=%d)", state.audioSamplesSent, pkt.Codec, len(state.lastAudioSample.Data), state.lastAudioSample.Duration.Milliseconds(), audioBroadcaster.PeerCount()))
}
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
state.lastAudioSample = &sample
@@ -778,13 +941,13 @@ func shouldDropPacketForLatency(pkt packets.Packet) bool {
return age > maxLivePacketAge
}
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, rtspClient capture.RTSPClient) {
config := configuration.Config
// Check if at least one track is available
if videoTrack == nil && audioTrack == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio tracks are nil, cannot proceed")
// Check if at least one broadcaster is available
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio broadcasters are nil, cannot proceed")
return
}
@@ -796,8 +959,22 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
return
}
// Create AAC transcoder if needed (AAC → G.711 µ-law).
var aacTranscoder *AACTranscoder
if codecs.hasAAC && audioBroadcaster != nil {
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): AAC audio detected, creating transcoder (audio_peers=%d)", audioBroadcaster.PeerCount()))
t, err := NewAACTranscoder()
if err != nil {
log.Log.Error("webrtc.main.WriteToTrack(): failed to create AAC transcoder: " + err.Error())
} else {
aacTranscoder = t
log.Log.Info("webrtc.main.WriteToTrack(): AAC transcoder created successfully")
defer aacTranscoder.Close()
}
}
if config.Capture.TranscodingWebRTC == "true" {
log.Log.Info("webrtc.main.WriteToTrack(): transcoding enabled but not yet implemented")
log.Log.Info("webrtc.main.WriteToTrack(): transcoding config enabled")
}
// Initialize streaming state
@@ -807,7 +984,13 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
}
defer func() {
writeFinalSamples(state, videoTrack, audioTrack)
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): audio summary packets=%d aac_packets=%d sent=%d aac_no_output=%d aac_errors=%d peers=%d", state.audioPacketsSeen, state.aacPacketsSeen, state.audioSamplesSent, state.aacNoOutput, state.aacErrors, func() int {
if audioBroadcaster == nil {
return 0
}
return audioBroadcaster.PeerCount()
}()))
writeFinalSamples(state, videoBroadcaster, audioBroadcaster)
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
}()
@@ -873,9 +1056,9 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
// Process video or audio packets
if pkt.IsVideo {
processVideoPacket(pkt, state, videoTrack, config)
processVideoPacket(pkt, state, videoBroadcaster, config)
} else if pkt.IsAudio {
processAudioPacket(pkt, state, audioTrack, codecs.hasAAC)
processAudioPacket(pkt, state, audioBroadcaster, aacTranscoder)
}
}
}