mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-13 00:09:17 +00:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4fbee60e9f | ||
|
|
d6c25df280 | ||
|
|
72a2d28e1e | ||
|
|
eb0972084f | ||
|
|
41a1d221fc | ||
|
|
eaacc93d2f | ||
|
|
0e6a004c23 | ||
|
|
617f854534 | ||
|
|
1bf8006055 | ||
|
|
ca0e426382 | ||
|
|
726d0722d9 | ||
|
|
d8f320b040 | ||
|
|
0131b87692 | ||
|
|
54e8198b65 | ||
|
|
3bfb68f950 |
@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
|
||||
COPY --chown=0:0 --from=build-machinery /dist /
|
||||
COPY --chown=0:0 --from=build-ui /dist /
|
||||
|
||||
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
|
||||
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
|
||||
|
||||
##################
|
||||
# Try running agent
|
||||
|
||||
@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
|
||||
COPY --chown=0:0 --from=build-machinery /dist /
|
||||
COPY --chown=0:0 --from=build-ui /dist /
|
||||
|
||||
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
|
||||
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
|
||||
|
||||
##################
|
||||
# Try running agent
|
||||
|
||||
@@ -800,17 +800,19 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
|
||||
// Check if we need to enable the live stream
|
||||
if config.Capture.Liveview != "false" {
|
||||
|
||||
// Should create a track here.
|
||||
// Create per-peer broadcasters instead of shared tracks.
|
||||
// Each viewer gets its own track with independent, non-blocking writes
|
||||
// so a slow/congested peer cannot stall the others.
|
||||
streams, _ := rtspClient.GetStreams()
|
||||
videoTrack := webrtc.NewVideoTrack(streams)
|
||||
audioTrack := webrtc.NewAudioTrack(streams)
|
||||
videoBroadcaster := webrtc.NewVideoBroadcaster(streams)
|
||||
audioBroadcaster := webrtc.NewAudioBroadcaster(streams)
|
||||
|
||||
if videoTrack == nil && audioTrack == nil {
|
||||
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio tracks")
|
||||
if videoBroadcaster == nil && audioBroadcaster == nil {
|
||||
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio broadcasters")
|
||||
return
|
||||
}
|
||||
|
||||
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoTrack, audioTrack, rtspClient)
|
||||
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, rtspClient)
|
||||
|
||||
if config.Capture.ForwardWebRTC == "true" {
|
||||
|
||||
@@ -818,7 +820,7 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
|
||||
log.Log.Info("cloud.HandleLiveStreamHD(): Waiting for peer connections.")
|
||||
for handshake := range communication.HandleLiveHDHandshake {
|
||||
log.Log.Info("cloud.HandleLiveStreamHD(): setting up a peer connection.")
|
||||
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
|
||||
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, handshake)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,96 +20,93 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
|
||||
|
||||
// This is legacy should be removed in future! Now everything
|
||||
// lives under the /api prefix.
|
||||
r.GET("/config", func(c *gin.Context) {
|
||||
r.GET("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
|
||||
components.GetConfig(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
// This is legacy should be removed in future! Now everything
|
||||
// lives under the /api prefix.
|
||||
r.POST("/config", func(c *gin.Context) {
|
||||
r.POST("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
|
||||
components.UpdateConfig(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api := r.Group("/api")
|
||||
{
|
||||
// Public endpoints (no authentication required)
|
||||
api.POST("/login", authMiddleware.LoginHandler)
|
||||
|
||||
api.GET("/dashboard", func(c *gin.Context) {
|
||||
components.GetDashboard(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.POST("/latest-events", func(c *gin.Context) {
|
||||
components.GetLatestEvents(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/days", func(c *gin.Context) {
|
||||
components.GetDays(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/config", func(c *gin.Context) {
|
||||
components.GetConfig(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
api.POST("/config", func(c *gin.Context) {
|
||||
components.UpdateConfig(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
// Will verify the hub settings.
|
||||
api.POST("/hub/verify", func(c *gin.Context) {
|
||||
cloud.VerifyHub(c)
|
||||
})
|
||||
|
||||
// Will verify the persistence settings.
|
||||
api.POST("/persistence/verify", func(c *gin.Context) {
|
||||
cloud.VerifyPersistence(c, configDirectory)
|
||||
})
|
||||
|
||||
// Will verify the secondary persistence settings.
|
||||
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
|
||||
cloud.VerifySecondaryPersistence(c, configDirectory)
|
||||
})
|
||||
|
||||
// Camera specific methods. Doesn't require any authorization.
|
||||
// These are available for anyone, but require the agent, to reach
|
||||
// the camera.
|
||||
|
||||
api.POST("/camera/restart", func(c *gin.Context) {
|
||||
components.RestartAgent(c, communication)
|
||||
})
|
||||
|
||||
api.POST("/camera/stop", func(c *gin.Context) {
|
||||
components.StopAgent(c, communication)
|
||||
})
|
||||
|
||||
api.POST("/camera/record", func(c *gin.Context) {
|
||||
components.MakeRecording(c, communication)
|
||||
})
|
||||
|
||||
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
|
||||
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
|
||||
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
// Onvif specific methods. Doesn't require any authorization.
|
||||
// Will verify the current onvif settings.
|
||||
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
|
||||
api.POST("/camera/onvif/login", LoginToOnvif)
|
||||
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
|
||||
api.POST("/camera/onvif/presets", GetOnvifPresets)
|
||||
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
|
||||
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
|
||||
api.POST("/camera/onvif/zoom", DoOnvifZoom)
|
||||
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
|
||||
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
|
||||
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
|
||||
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
|
||||
|
||||
// Secured endpoints..
|
||||
// Apply JWT authentication middleware.
|
||||
// All routes registered below this line require a valid JWT token.
|
||||
api.Use(authMiddleware.MiddlewareFunc())
|
||||
{
|
||||
api.GET("/dashboard", func(c *gin.Context) {
|
||||
components.GetDashboard(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.POST("/latest-events", func(c *gin.Context) {
|
||||
components.GetLatestEvents(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/days", func(c *gin.Context) {
|
||||
components.GetDays(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/config", func(c *gin.Context) {
|
||||
components.GetConfig(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
api.POST("/config", func(c *gin.Context) {
|
||||
components.UpdateConfig(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
// Will verify the hub settings.
|
||||
api.POST("/hub/verify", func(c *gin.Context) {
|
||||
cloud.VerifyHub(c)
|
||||
})
|
||||
|
||||
// Will verify the persistence settings.
|
||||
api.POST("/persistence/verify", func(c *gin.Context) {
|
||||
cloud.VerifyPersistence(c, configDirectory)
|
||||
})
|
||||
|
||||
// Will verify the secondary persistence settings.
|
||||
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
|
||||
cloud.VerifySecondaryPersistence(c, configDirectory)
|
||||
})
|
||||
|
||||
// Camera specific methods.
|
||||
api.POST("/camera/restart", func(c *gin.Context) {
|
||||
components.RestartAgent(c, communication)
|
||||
})
|
||||
|
||||
api.POST("/camera/stop", func(c *gin.Context) {
|
||||
components.StopAgent(c, communication)
|
||||
})
|
||||
|
||||
api.POST("/camera/record", func(c *gin.Context) {
|
||||
components.MakeRecording(c, communication)
|
||||
})
|
||||
|
||||
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
|
||||
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
|
||||
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
// Onvif specific methods.
|
||||
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
|
||||
api.POST("/camera/onvif/login", LoginToOnvif)
|
||||
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
|
||||
api.POST("/camera/onvif/presets", GetOnvifPresets)
|
||||
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
|
||||
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
|
||||
api.POST("/camera/onvif/zoom", DoOnvifZoom)
|
||||
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
|
||||
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
|
||||
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
|
||||
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
|
||||
}
|
||||
}
|
||||
return api
|
||||
|
||||
@@ -90,9 +90,9 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
|
||||
|
||||
// Some extra options to make sure the connection behaves
|
||||
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
|
||||
opts.SetCleanSession(false)
|
||||
opts.SetResumeSubs(true)
|
||||
opts.SetStore(mqtt.NewMemoryStore())
|
||||
opts.SetCleanSession(true)
|
||||
//opts.SetResumeSubs(true)
|
||||
//opts.SetStore(mqtt.NewMemoryStore())
|
||||
opts.SetConnectRetry(true)
|
||||
opts.SetAutoReconnect(true)
|
||||
opts.SetConnectRetryInterval(5 * time.Second)
|
||||
@@ -169,6 +169,12 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
|
||||
return nil
|
||||
}
|
||||
|
||||
// maxSignalingAge is the maximum age of a WebRTC signaling message (request-hd-stream,
|
||||
// receive-hd-candidates) before it is considered stale and discarded. With CleanSession=false
|
||||
// the MQTT broker may replay queued messages from previous sessions; this prevents the agent
|
||||
// from setting up peer connections for viewers that are no longer waiting.
|
||||
const maxSignalingAge = 30 * time.Second
|
||||
|
||||
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory string, configuration *models.Configuration, communication *models.Communication) {
|
||||
if hubKey == "" {
|
||||
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}")
|
||||
@@ -274,6 +280,18 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
|
||||
|
||||
// We'll find out which message we received, and act accordingly.
|
||||
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): received message with action: " + payload.Action)
|
||||
|
||||
// For time-sensitive WebRTC signaling messages, discard stale ones that may
|
||||
// have been queued by the broker while CleanSession=false.
|
||||
if payload.Action == "request-hd-stream" || payload.Action == "receive-hd-candidates" {
|
||||
messageAge := time.Since(time.Unix(message.Timestamp, 0))
|
||||
if messageAge > maxSignalingAge {
|
||||
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): discarding stale " + payload.Action +
|
||||
" message (age: " + messageAge.Round(time.Second).String() + ")")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
switch payload.Action {
|
||||
case "record":
|
||||
go HandleRecording(mqttClient, hubKey, payload, configuration, communication)
|
||||
|
||||
270
machinery/src/webrtc/aac_transcoder_ffmpeg.go
Normal file
270
machinery/src/webrtc/aac_transcoder_ffmpeg.go
Normal file
@@ -0,0 +1,270 @@
|
||||
// AAC to G.711 µ-law transcoder using FFmpeg (libavcodec + libswresample).
|
||||
// Build with: go build -tags ffmpeg ...
|
||||
//
|
||||
// Requires: libavcodec-dev, libavutil-dev, libswresample-dev (FFmpeg ≥ 5.x)
|
||||
// and an AAC decoder compiled into the FFmpeg build (usually the default).
|
||||
//
|
||||
//go:build ffmpeg
|
||||
|
||||
package webrtc
|
||||
|
||||
/*
|
||||
#cgo pkg-config: libavcodec libavutil libswresample
|
||||
#cgo CFLAGS: -Wno-deprecated-declarations
|
||||
|
||||
#include <libavcodec/avcodec.h>
|
||||
#include <libavutil/channel_layout.h>
|
||||
#include <libavutil/frame.h>
|
||||
#include <libavutil/mem.h>
|
||||
#include <libavutil/opt.h>
|
||||
#include <libswresample/swresample.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
// ── Transcoder handle ───────────────────────────────────────────────────
|
||||
|
||||
typedef struct {
|
||||
AVCodecContext *codec_ctx;
|
||||
AVCodecParserContext *parser;
|
||||
SwrContext *swr_ctx;
|
||||
AVFrame *frame;
|
||||
AVPacket *pkt;
|
||||
int swr_initialized;
|
||||
int in_sample_rate;
|
||||
int in_channels;
|
||||
} aac_transcoder_t;
|
||||
|
||||
// ── Create / Destroy ────────────────────────────────────────────────────
|
||||
|
||||
static aac_transcoder_t* aac_transcoder_create(void) {
|
||||
const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC);
|
||||
if (!codec) return NULL;
|
||||
|
||||
aac_transcoder_t *t = (aac_transcoder_t*)calloc(1, sizeof(aac_transcoder_t));
|
||||
if (!t) return NULL;
|
||||
|
||||
t->codec_ctx = avcodec_alloc_context3(codec);
|
||||
if (!t->codec_ctx) { free(t); return NULL; }
|
||||
|
||||
if (avcodec_open2(t->codec_ctx, codec, NULL) < 0) {
|
||||
avcodec_free_context(&t->codec_ctx);
|
||||
free(t);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
t->parser = av_parser_init(AV_CODEC_ID_AAC);
|
||||
if (!t->parser) {
|
||||
avcodec_free_context(&t->codec_ctx);
|
||||
free(t);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
t->frame = av_frame_alloc();
|
||||
t->pkt = av_packet_alloc();
|
||||
if (!t->frame || !t->pkt) {
|
||||
if (t->frame) av_frame_free(&t->frame);
|
||||
if (t->pkt) av_packet_free(&t->pkt);
|
||||
av_parser_close(t->parser);
|
||||
avcodec_free_context(&t->codec_ctx);
|
||||
free(t);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
static void aac_transcoder_destroy(aac_transcoder_t *t) {
|
||||
if (!t) return;
|
||||
if (t->swr_ctx) swr_free(&t->swr_ctx);
|
||||
if (t->frame) av_frame_free(&t->frame);
|
||||
if (t->pkt) av_packet_free(&t->pkt);
|
||||
if (t->parser) av_parser_close(t->parser);
|
||||
if (t->codec_ctx) avcodec_free_context(&t->codec_ctx);
|
||||
free(t);
|
||||
}
|
||||
|
||||
// ── Lazy resampler init (called after the first decoded frame) ──────────
|
||||
|
||||
static int aac_init_swr(aac_transcoder_t *t) {
|
||||
int64_t in_ch_layout = (int64_t)t->codec_ctx->channel_layout;
|
||||
if (in_ch_layout == 0)
|
||||
in_ch_layout = av_get_default_channel_layout(t->codec_ctx->channels);
|
||||
|
||||
t->swr_ctx = swr_alloc_set_opts(
|
||||
NULL,
|
||||
AV_CH_LAYOUT_MONO, // out: mono
|
||||
AV_SAMPLE_FMT_S16, // out: signed 16-bit
|
||||
8000, // out: 8 kHz
|
||||
in_ch_layout, // in: from decoder
|
||||
t->codec_ctx->sample_fmt, // in: from decoder
|
||||
t->codec_ctx->sample_rate, // in: from decoder
|
||||
0, NULL);
|
||||
|
||||
if (!t->swr_ctx) return -1;
|
||||
if (swr_init(t->swr_ctx) < 0) {
|
||||
swr_free(&t->swr_ctx);
|
||||
return -1;
|
||||
}
|
||||
|
||||
t->in_sample_rate = t->codec_ctx->sample_rate;
|
||||
t->in_channels = t->codec_ctx->channels;
|
||||
t->swr_initialized = 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ── Transcode ADTS → 8 kHz mono S16 PCM ────────────────────────────────
|
||||
// Caller must free *out_pcm with av_free() when non-NULL.
|
||||
|
||||
static int aac_transcode_to_pcm(aac_transcoder_t *t,
|
||||
const uint8_t *data, int data_size,
|
||||
uint8_t **out_pcm, int *out_size) {
|
||||
*out_pcm = NULL;
|
||||
*out_size = 0;
|
||||
if (!data || data_size <= 0) return 0;
|
||||
|
||||
int buf_cap = 8192;
|
||||
uint8_t *buf = (uint8_t*)av_malloc(buf_cap);
|
||||
if (!buf) return -1;
|
||||
int buf_len = 0;
|
||||
|
||||
while (data_size > 0) {
|
||||
uint8_t *pout = NULL;
|
||||
int pout_size = 0;
|
||||
|
||||
int used = av_parser_parse2(t->parser, t->codec_ctx,
|
||||
&pout, &pout_size,
|
||||
data, data_size,
|
||||
AV_NOPTS_VALUE, AV_NOPTS_VALUE, 0);
|
||||
if (used < 0) break;
|
||||
data += used;
|
||||
data_size -= used;
|
||||
if (pout_size == 0) continue;
|
||||
|
||||
// Feed parsed frame to decoder
|
||||
t->pkt->data = pout;
|
||||
t->pkt->size = pout_size;
|
||||
if (avcodec_send_packet(t->codec_ctx, t->pkt) < 0) continue;
|
||||
|
||||
// Pull all decoded frames
|
||||
while (avcodec_receive_frame(t->codec_ctx, t->frame) == 0) {
|
||||
if (!t->swr_initialized) {
|
||||
if (aac_init_swr(t) < 0) {
|
||||
av_frame_unref(t->frame);
|
||||
av_free(buf);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int out_samples = swr_get_out_samples(t->swr_ctx,
|
||||
t->frame->nb_samples);
|
||||
if (out_samples <= 0) out_samples = t->frame->nb_samples;
|
||||
|
||||
int needed = buf_len + out_samples * 2; // S16 = 2 bytes/sample
|
||||
if (needed > buf_cap) {
|
||||
buf_cap = needed * 2;
|
||||
uint8_t *tmp = (uint8_t*)av_realloc(buf, buf_cap);
|
||||
if (!tmp) { av_frame_unref(t->frame); av_free(buf); return -1; }
|
||||
buf = tmp;
|
||||
}
|
||||
|
||||
uint8_t *dst = buf + buf_len;
|
||||
int converted = swr_convert(t->swr_ctx,
|
||||
&dst, out_samples,
|
||||
(const uint8_t**)t->frame->extended_data,
|
||||
t->frame->nb_samples);
|
||||
if (converted > 0)
|
||||
buf_len += converted * 2;
|
||||
|
||||
av_frame_unref(t->frame);
|
||||
}
|
||||
}
|
||||
|
||||
if (buf_len == 0) {
|
||||
av_free(buf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
*out_pcm = buf;
|
||||
*out_size = buf_len;
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/zaf/g711"
|
||||
)
|
||||
|
||||
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
|
||||
// is compiled in (requires the "ffmpeg" build tag).
|
||||
func AACTranscodingAvailable() bool { return true }
|
||||
|
||||
// AACTranscoder decodes ADTS-wrapped AAC audio to 8 kHz mono PCM
|
||||
// and encodes it as G.711 µ-law for WebRTC transport.
|
||||
type AACTranscoder struct {
|
||||
handle *C.aac_transcoder_t
|
||||
}
|
||||
|
||||
// NewAACTranscoder creates a transcoder backed by FFmpeg's AAC decoder.
|
||||
func NewAACTranscoder() (*AACTranscoder, error) {
|
||||
h := C.aac_transcoder_create()
|
||||
if h == nil {
|
||||
return nil, errors.New("failed to create AAC transcoder (FFmpeg AAC decoder not available?)")
|
||||
}
|
||||
log.Log.Info("webrtc.aac_transcoder: AAC → G.711 µ-law transcoder initialised (FFmpeg)")
|
||||
return &AACTranscoder{handle: h}, nil
|
||||
}
|
||||
|
||||
// Transcode converts an ADTS buffer (one or more AAC frames) into
|
||||
// G.711 µ-law encoded audio suitable for a PCMU WebRTC track.
|
||||
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
|
||||
if t == nil || t.handle == nil || len(adtsData) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var outPCM *C.uint8_t
|
||||
var outSize C.int
|
||||
|
||||
ret := C.aac_transcode_to_pcm(
|
||||
t.handle,
|
||||
(*C.uint8_t)(unsafe.Pointer(&adtsData[0])),
|
||||
C.int(len(adtsData)),
|
||||
&outPCM, &outSize,
|
||||
)
|
||||
if ret < 0 {
|
||||
return nil, errors.New("AAC decode/resample failed")
|
||||
}
|
||||
if outSize == 0 || outPCM == nil {
|
||||
return nil, nil // decoder buffering, no output yet
|
||||
}
|
||||
defer C.av_free(unsafe.Pointer(outPCM))
|
||||
|
||||
// Copy S16LE PCM to Go slice, then encode to µ-law.
|
||||
pcm := C.GoBytes(unsafe.Pointer(outPCM), outSize)
|
||||
ulaw := g711.EncodeUlaw(pcm)
|
||||
|
||||
// Log resampler details once.
|
||||
if t.handle.swr_initialized == 1 && t.handle.in_sample_rate != 0 {
|
||||
log.Log.Info(fmt.Sprintf(
|
||||
"webrtc.aac_transcoder: first output – resampling %d Hz / %d ch → 8000 Hz mono → µ-law",
|
||||
int(t.handle.in_sample_rate), int(t.handle.in_channels)))
|
||||
// Prevent repeated logging by zeroing the field we check.
|
||||
t.handle.in_sample_rate = 0
|
||||
}
|
||||
|
||||
return ulaw, nil
|
||||
}
|
||||
|
||||
// Close releases all FFmpeg resources held by the transcoder.
|
||||
func (t *AACTranscoder) Close() {
|
||||
if t != nil && t.handle != nil {
|
||||
C.aac_transcoder_destroy(t.handle)
|
||||
t.handle = nil
|
||||
log.Log.Info("webrtc.aac_transcoder: transcoder closed")
|
||||
}
|
||||
}
|
||||
205
machinery/src/webrtc/aac_transcoder_stub.go
Normal file
205
machinery/src/webrtc/aac_transcoder_stub.go
Normal file
@@ -0,0 +1,205 @@
|
||||
// AAC transcoding fallback that uses the ffmpeg binary at runtime.
|
||||
// Build with -tags ffmpeg to use the in-process CGO implementation instead.
|
||||
//
|
||||
//go:build !ffmpeg
|
||||
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
)
|
||||
|
||||
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
|
||||
// is available in the current runtime.
|
||||
func AACTranscodingAvailable() bool {
|
||||
_, err := exec.LookPath("ffmpeg")
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// AACTranscoder uses an ffmpeg subprocess to convert ADTS AAC to raw PCMU.
|
||||
type AACTranscoder struct {
|
||||
cmd *exec.Cmd
|
||||
stdin io.WriteCloser
|
||||
stdout io.ReadCloser
|
||||
stderrBuf bytes.Buffer
|
||||
|
||||
mu sync.Mutex
|
||||
outMu sync.Mutex
|
||||
outBuf bytes.Buffer
|
||||
closed bool
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// NewAACTranscoder creates a runtime ffmpeg-based transcoder.
|
||||
func NewAACTranscoder() (*AACTranscoder, error) {
|
||||
ffmpegPath, err := exec.LookPath("ffmpeg")
|
||||
if err != nil {
|
||||
return nil, errors.New("AAC transcoding not available: ffmpeg binary not found in PATH")
|
||||
}
|
||||
log.Log.Info("webrtc.aac_transcoder: using ffmpeg binary at " + ffmpegPath)
|
||||
|
||||
cmd := exec.Command(
|
||||
ffmpegPath,
|
||||
"-hide_banner",
|
||||
"-loglevel", "error",
|
||||
"-fflags", "+nobuffer",
|
||||
"-flags", "low_delay",
|
||||
"-f", "aac",
|
||||
"-i", "pipe:0",
|
||||
"-vn",
|
||||
"-ac", "1",
|
||||
"-ar", "8000",
|
||||
"-acodec", "pcm_mulaw",
|
||||
"-f", "mulaw",
|
||||
"pipe:1",
|
||||
)
|
||||
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cmd.Stderr = &bytes.Buffer{}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := &AACTranscoder{
|
||||
cmd: cmd,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
}
|
||||
if stderrBuf, ok := cmd.Stderr.(*bytes.Buffer); ok {
|
||||
t.stderrBuf = *stderrBuf
|
||||
}
|
||||
|
||||
go func() {
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, readErr := stdout.Read(buf)
|
||||
if n > 0 {
|
||||
t.outMu.Lock()
|
||||
_, _ = t.outBuf.Write(buf[:n])
|
||||
buffered := t.outBuf.Len()
|
||||
t.outMu.Unlock()
|
||||
if buffered <= 8192 || buffered%16000 == 0 {
|
||||
log.Log.Info("webrtc.aac_transcoder: ffmpeg produced PCMU bytes, buffered=" + strconv.Itoa(buffered))
|
||||
}
|
||||
}
|
||||
if readErr != nil {
|
||||
if readErr != io.EOF {
|
||||
log.Log.Warning("webrtc.aac_transcoder: stdout reader stopped: " + readErr.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Log.Info("webrtc.aac_transcoder: AAC → PCMU transcoder initialised (ffmpeg process)")
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// Transcode writes ADTS AAC to ffmpeg and returns any PCMU bytes produced.
|
||||
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
|
||||
if t == nil || len(adtsData) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if t.closed {
|
||||
return nil, errors.New("AAC transcoder is closed")
|
||||
}
|
||||
|
||||
if _, err := t.stdin.Write(adtsData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(adtsData) <= 512 || len(adtsData)%1024 == 0 {
|
||||
log.Log.Info("webrtc.aac_transcoder: wrote AAC bytes to ffmpeg, input=" + strconv.Itoa(len(adtsData)))
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(75 * time.Millisecond)
|
||||
for {
|
||||
data := t.readAvailable()
|
||||
if len(data) > 0 {
|
||||
log.Log.Info("webrtc.aac_transcoder: returning PCMU bytes=" + strconv.Itoa(len(data)))
|
||||
return data, nil
|
||||
}
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
if stderr := t.stderrString(); stderr != "" {
|
||||
log.Log.Warning("webrtc.aac_transcoder: no output before deadline, ffmpeg stderr: " + stderr)
|
||||
} else {
|
||||
log.Log.Info("webrtc.aac_transcoder: no PCMU output before deadline")
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *AACTranscoder) readAvailable() []byte {
|
||||
t.outMu.Lock()
|
||||
defer t.outMu.Unlock()
|
||||
|
||||
if t.outBuf.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
out := make([]byte, t.outBuf.Len())
|
||||
copy(out, t.outBuf.Bytes())
|
||||
t.outBuf.Reset()
|
||||
return out
|
||||
}
|
||||
|
||||
func (t *AACTranscoder) stderrString() string {
|
||||
if t == nil {
|
||||
return ""
|
||||
}
|
||||
if stderrBuf, ok := t.cmd.Stderr.(*bytes.Buffer); ok {
|
||||
return strings.TrimSpace(stderrBuf.String())
|
||||
}
|
||||
return strings.TrimSpace(t.stderrBuf.String())
|
||||
}
|
||||
|
||||
// Close stops the ffmpeg subprocess.
|
||||
func (t *AACTranscoder) Close() {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
t.closeOnce.Do(func() {
|
||||
t.mu.Lock()
|
||||
t.closed = true
|
||||
if t.stdin != nil {
|
||||
_ = t.stdin.Close()
|
||||
}
|
||||
t.mu.Unlock()
|
||||
|
||||
if t.stdout != nil {
|
||||
_ = t.stdout.Close()
|
||||
}
|
||||
|
||||
if t.cmd != nil {
|
||||
_ = t.cmd.Process.Kill()
|
||||
_, _ = t.cmd.Process.Wait()
|
||||
if stderr := t.stderrString(); stderr != "" {
|
||||
log.Log.Info("webrtc.aac_transcoder: ffmpeg stderr on close: " + stderr)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
137
machinery/src/webrtc/broadcaster.go
Normal file
137
machinery/src/webrtc/broadcaster.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
pionWebRTC "github.com/pion/webrtc/v4"
|
||||
pionMedia "github.com/pion/webrtc/v4/pkg/media"
|
||||
)
|
||||
|
||||
const (
|
||||
// peerSampleBuffer controls how many samples can be buffered per peer before
|
||||
// dropping. Keeps slow peers from blocking the broadcaster.
|
||||
peerSampleBuffer = 60
|
||||
)
|
||||
|
||||
// peerTrack is a per-peer track with its own non-blocking sample channel.
|
||||
type peerTrack struct {
|
||||
track *pionWebRTC.TrackLocalStaticSample
|
||||
samples chan pionMedia.Sample
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// TrackBroadcaster fans out media samples to multiple peer-specific tracks
|
||||
// without blocking. Each peer gets its own TrackLocalStaticSample and a
|
||||
// goroutine that drains samples independently, so a slow/congested peer
|
||||
// cannot stall the others.
|
||||
type TrackBroadcaster struct {
|
||||
mu sync.RWMutex
|
||||
peers map[string]*peerTrack
|
||||
mimeType string
|
||||
id string
|
||||
streamID string
|
||||
}
|
||||
|
||||
// NewTrackBroadcaster creates a new broadcaster for either video or audio.
|
||||
func NewTrackBroadcaster(mimeType string, id string, streamID string) *TrackBroadcaster {
|
||||
return &TrackBroadcaster{
|
||||
peers: make(map[string]*peerTrack),
|
||||
mimeType: mimeType,
|
||||
id: id,
|
||||
streamID: streamID,
|
||||
}
|
||||
}
|
||||
|
||||
// AddPeer creates a new per-peer track and starts a writer goroutine.
|
||||
// Returns the track to be added to the PeerConnection via AddTrack().
|
||||
func (b *TrackBroadcaster) AddPeer(sessionKey string) (*pionWebRTC.TrackLocalStaticSample, error) {
|
||||
track, err := pionWebRTC.NewTrackLocalStaticSample(
|
||||
pionWebRTC.RTPCodecCapability{MimeType: b.mimeType},
|
||||
b.id,
|
||||
b.streamID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pt := &peerTrack{
|
||||
track: track,
|
||||
samples: make(chan pionMedia.Sample, peerSampleBuffer),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
b.peers[sessionKey] = pt
|
||||
b.mu.Unlock()
|
||||
|
||||
// Per-peer writer goroutine — drains samples independently.
|
||||
go func() {
|
||||
defer close(pt.done)
|
||||
for sample := range pt.samples {
|
||||
if err := pt.track.WriteSample(sample); err != nil {
|
||||
if err == io.ErrClosedPipe {
|
||||
return
|
||||
}
|
||||
log.Log.Error("webrtc.broadcaster.peerWriter(): error writing sample for " + sessionKey + ": " + err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Log.Info("webrtc.broadcaster.AddPeer(): added peer track for " + sessionKey)
|
||||
return track, nil
|
||||
}
|
||||
|
||||
// RemovePeer stops the writer goroutine and removes the peer.
|
||||
func (b *TrackBroadcaster) RemovePeer(sessionKey string) {
|
||||
b.mu.Lock()
|
||||
pt, exists := b.peers[sessionKey]
|
||||
if exists {
|
||||
delete(b.peers, sessionKey)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
|
||||
if exists {
|
||||
close(pt.samples)
|
||||
<-pt.done // wait for writer goroutine to finish
|
||||
log.Log.Info("webrtc.broadcaster.RemovePeer(): removed peer track for " + sessionKey)
|
||||
}
|
||||
}
|
||||
|
||||
// WriteSample fans out a sample to all connected peers without blocking.
|
||||
// If a peer's buffer is full (slow consumer), the sample is dropped for
|
||||
// that peer only — other peers are unaffected.
|
||||
func (b *TrackBroadcaster) WriteSample(sample pionMedia.Sample) {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
for sessionKey, pt := range b.peers {
|
||||
select {
|
||||
case pt.samples <- sample:
|
||||
default:
|
||||
log.Log.Warning("webrtc.broadcaster.WriteSample(): dropping sample for slow peer " + sessionKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// PeerCount returns the current number of connected peers.
|
||||
func (b *TrackBroadcaster) PeerCount() int {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
return len(b.peers)
|
||||
}
|
||||
|
||||
// Close removes all peers and stops all writer goroutines.
|
||||
func (b *TrackBroadcaster) Close() {
|
||||
b.mu.Lock()
|
||||
keys := make([]string, 0, len(b.peers))
|
||||
for k := range b.peers {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
|
||||
for _, key := range keys {
|
||||
b.RemovePeer(key)
|
||||
}
|
||||
}
|
||||
@@ -4,13 +4,14 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
//"github.com/izern/go-fdkaac/fdkaac"
|
||||
"github.com/kerberos-io/agent/machinery/src/capture"
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
@@ -29,9 +30,10 @@ const (
|
||||
rtcpBufferSize = 1500
|
||||
|
||||
// Timeouts and intervals
|
||||
keepAliveTimeout = 15 * time.Second
|
||||
defaultTimeout = 10 * time.Second
|
||||
maxLivePacketAge = 1500 * time.Millisecond
|
||||
keepAliveTimeout = 15 * time.Second
|
||||
defaultTimeout = 10 * time.Second
|
||||
maxLivePacketAge = 1500 * time.Millisecond
|
||||
disconnectGracePeriod = 5 * time.Second
|
||||
|
||||
// Track identifiers
|
||||
trackStreamID = "kerberos-stream"
|
||||
@@ -47,11 +49,16 @@ type ConnectionManager struct {
|
||||
|
||||
// peerConnectionWrapper wraps a peer connection with additional metadata
|
||||
type peerConnectionWrapper struct {
|
||||
conn *pionWebRTC.PeerConnection
|
||||
cancelCtx context.CancelFunc
|
||||
done chan struct{}
|
||||
closeOnce sync.Once
|
||||
connected atomic.Bool
|
||||
conn *pionWebRTC.PeerConnection
|
||||
cancelCtx context.CancelFunc
|
||||
done chan struct{}
|
||||
closeOnce sync.Once
|
||||
connected atomic.Bool
|
||||
disconnectMu sync.Mutex
|
||||
disconnectTimer *time.Timer
|
||||
sessionKey string
|
||||
videoBroadcaster *TrackBroadcaster
|
||||
audioBroadcaster *TrackBroadcaster
|
||||
}
|
||||
|
||||
var globalConnectionManager = NewConnectionManager()
|
||||
@@ -150,6 +157,15 @@ func cleanupPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
|
||||
log.Log.Info("webrtc.main.cleanupPeerConnection(): Peer disconnected. Active peers: " + strconv.FormatInt(count, 10))
|
||||
}
|
||||
|
||||
// Remove per-peer tracks from broadcasters so the fan-out stops
|
||||
// writing to this peer immediately.
|
||||
if wrapper.videoBroadcaster != nil {
|
||||
wrapper.videoBroadcaster.RemovePeer(sessionKey)
|
||||
}
|
||||
if wrapper.audioBroadcaster != nil {
|
||||
wrapper.audioBroadcaster.RemovePeer(sessionKey)
|
||||
}
|
||||
|
||||
globalConnectionManager.CloseCandidateChannel(sessionKey)
|
||||
|
||||
if wrapper.conn != nil {
|
||||
@@ -236,7 +252,7 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
|
||||
return nil
|
||||
}
|
||||
|
||||
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {
|
||||
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) {
|
||||
|
||||
config := configuration.Config
|
||||
deviceKey := config.Key
|
||||
@@ -316,14 +332,25 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
// Create context for this connection
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
wrapper := &peerConnectionWrapper{
|
||||
conn: peerConnection,
|
||||
cancelCtx: cancel,
|
||||
done: make(chan struct{}),
|
||||
conn: peerConnection,
|
||||
cancelCtx: cancel,
|
||||
done: make(chan struct{}),
|
||||
sessionKey: sessionKey,
|
||||
videoBroadcaster: videoBroadcaster,
|
||||
audioBroadcaster: audioBroadcaster,
|
||||
}
|
||||
|
||||
// Create a per-peer video track from the broadcaster so writes
|
||||
// to this peer are independent and non-blocking.
|
||||
var videoSender *pionWebRTC.RTPSender = nil
|
||||
if videoTrack != nil {
|
||||
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
|
||||
if videoBroadcaster != nil {
|
||||
peerVideoTrack, trackErr := videoBroadcaster.AddPeer(sessionKey)
|
||||
if trackErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer video track: " + trackErr.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
if videoSender, err = peerConnection.AddTrack(peerVideoTrack); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
@@ -354,9 +381,16 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
}()
|
||||
}
|
||||
|
||||
// Create a per-peer audio track from the broadcaster.
|
||||
var audioSender *pionWebRTC.RTPSender = nil
|
||||
if audioTrack != nil {
|
||||
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
|
||||
if audioBroadcaster != nil {
|
||||
peerAudioTrack, trackErr := audioBroadcaster.AddPeer(sessionKey)
|
||||
if trackErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer audio track: " + trackErr.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
if audioSender, err = peerConnection.AddTrack(peerAudioTrack); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
@@ -387,14 +421,61 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
}()
|
||||
}
|
||||
|
||||
// Log ICE connection state changes for diagnostics
|
||||
peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() +
|
||||
" (session: " + handshake.SessionID + ")")
|
||||
})
|
||||
|
||||
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() +
|
||||
" (session: " + handshake.SessionID + ")")
|
||||
|
||||
switch connectionState {
|
||||
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed, pionWebRTC.PeerConnectionStateFailed:
|
||||
case pionWebRTC.PeerConnectionStateDisconnected:
|
||||
// Disconnected is a transient state that can recover.
|
||||
// Start a grace period timer; if we don't recover, then cleanup.
|
||||
wrapper.disconnectMu.Lock()
|
||||
if wrapper.disconnectTimer == nil {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " +
|
||||
disconnectGracePeriod.String() + " for recovery (session: " + handshake.SessionID + ")")
|
||||
wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshake.SessionID + ")")
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
})
|
||||
}
|
||||
wrapper.disconnectMu.Unlock()
|
||||
|
||||
case pionWebRTC.PeerConnectionStateFailed:
|
||||
// Stop any pending disconnect timer
|
||||
wrapper.disconnectMu.Lock()
|
||||
if wrapper.disconnectTimer != nil {
|
||||
wrapper.disconnectTimer.Stop()
|
||||
wrapper.disconnectTimer = nil
|
||||
}
|
||||
wrapper.disconnectMu.Unlock()
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
|
||||
case pionWebRTC.PeerConnectionStateClosed:
|
||||
// Stop any pending disconnect timer
|
||||
wrapper.disconnectMu.Lock()
|
||||
if wrapper.disconnectTimer != nil {
|
||||
wrapper.disconnectTimer.Stop()
|
||||
wrapper.disconnectTimer = nil
|
||||
}
|
||||
wrapper.disconnectMu.Unlock()
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
|
||||
case pionWebRTC.PeerConnectionStateConnected:
|
||||
// Cancel any pending disconnect timer — connection recovered
|
||||
wrapper.disconnectMu.Lock()
|
||||
if wrapper.disconnectTimer != nil {
|
||||
wrapper.disconnectTimer.Stop()
|
||||
wrapper.disconnectTimer = nil
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshake.SessionID + ")")
|
||||
}
|
||||
wrapper.disconnectMu.Unlock()
|
||||
|
||||
if wrapper.connected.CompareAndSwap(false, true) {
|
||||
count := globalConnectionManager.IncrementPeerCount()
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + strconv.FormatInt(count, 10))
|
||||
@@ -548,6 +629,46 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
}
|
||||
}
|
||||
|
||||
func NewVideoBroadcaster(streams []packets.Stream) *TrackBroadcaster {
|
||||
// Verify H264 is available (same check as NewVideoTrack)
|
||||
for _, s := range streams {
|
||||
if s.Name == "H264" {
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypeH264, "video", trackStreamID)
|
||||
}
|
||||
}
|
||||
log.Log.Error("webrtc.main.NewVideoBroadcaster(): no H264 stream found")
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
|
||||
var audioCodecNames []string
|
||||
hasAAC := false
|
||||
for _, s := range streams {
|
||||
if s.IsAudio {
|
||||
audioCodecNames = append(audioCodecNames, s.Name)
|
||||
}
|
||||
switch s.Name {
|
||||
case "OPUS":
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypeOpus, "audio", trackStreamID)
|
||||
case "PCM_MULAW":
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
|
||||
case "PCM_ALAW":
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMA, "audio", trackStreamID)
|
||||
case "AAC":
|
||||
hasAAC = true
|
||||
}
|
||||
}
|
||||
if hasAAC {
|
||||
log.Log.Info("webrtc.main.NewAudioBroadcaster(): AAC detected, creating PCMU audio track for transcoded output")
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
|
||||
} else if len(audioCodecNames) > 0 {
|
||||
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioBroadcaster(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.NewAudioBroadcaster(): no audio stream found in camera feed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
|
||||
mimeType := pionWebRTC.MimeTypeH264
|
||||
outboundVideoTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", trackStreamID)
|
||||
@@ -560,18 +681,33 @@ func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample
|
||||
|
||||
func NewAudioTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
|
||||
var mimeType string
|
||||
var audioCodecNames []string
|
||||
hasAAC := false
|
||||
for _, stream := range streams {
|
||||
if stream.IsAudio {
|
||||
audioCodecNames = append(audioCodecNames, stream.Name)
|
||||
}
|
||||
if stream.Name == "OPUS" {
|
||||
mimeType = pionWebRTC.MimeTypeOpus
|
||||
} else if stream.Name == "PCM_MULAW" {
|
||||
mimeType = pionWebRTC.MimeTypePCMU
|
||||
} else if stream.Name == "PCM_ALAW" {
|
||||
mimeType = pionWebRTC.MimeTypePCMA
|
||||
} else if stream.Name == "AAC" {
|
||||
hasAAC = true
|
||||
}
|
||||
}
|
||||
if mimeType == "" {
|
||||
log.Log.Error("webrtc.main.NewAudioTrack(): no supported audio codec found")
|
||||
return nil
|
||||
if hasAAC {
|
||||
mimeType = pionWebRTC.MimeTypePCMU
|
||||
log.Log.Info("webrtc.main.NewAudioTrack(): AAC detected, creating PCMU audio track for transcoded output")
|
||||
} else if len(audioCodecNames) > 0 {
|
||||
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioTrack(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
|
||||
return nil
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.NewAudioTrack(): no audio stream found in camera feed")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
outboundAudioTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", trackStreamID)
|
||||
if err != nil {
|
||||
@@ -590,6 +726,11 @@ type streamState struct {
|
||||
receivedKeyFrame bool
|
||||
lastAudioSample *pionMedia.Sample
|
||||
lastVideoSample *pionMedia.Sample
|
||||
audioPacketsSeen int64
|
||||
aacPacketsSeen int64
|
||||
audioSamplesSent int64
|
||||
aacNoOutput int64
|
||||
aacErrors int64
|
||||
}
|
||||
|
||||
// codecSupport tracks which codecs are available in the stream
|
||||
@@ -661,17 +802,13 @@ func updateStreamState(communication *models.Communication, state *streamState)
|
||||
}
|
||||
|
||||
// writeFinalSamples writes any remaining buffered samples
|
||||
func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.TrackLocalStaticSample) {
|
||||
if state.lastVideoSample != nil && videoTrack != nil {
|
||||
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
|
||||
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final video sample: " + err.Error())
|
||||
}
|
||||
func writeFinalSamples(state *streamState, videoBroadcaster, audioBroadcaster *TrackBroadcaster) {
|
||||
if state.lastVideoSample != nil && videoBroadcaster != nil {
|
||||
videoBroadcaster.WriteSample(*state.lastVideoSample)
|
||||
}
|
||||
|
||||
if state.lastAudioSample != nil && audioTrack != nil {
|
||||
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
|
||||
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final audio sample: " + err.Error())
|
||||
}
|
||||
if state.lastAudioSample != nil && audioBroadcaster != nil {
|
||||
audioBroadcaster.WriteSample(*state.lastAudioSample)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -710,9 +847,9 @@ func sampleDuration(current packets.Packet, previousTimestamp uint32, fallback t
|
||||
return fallback
|
||||
}
|
||||
|
||||
// processVideoPacket processes a video packet and writes samples to the track
|
||||
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
|
||||
if videoTrack == nil {
|
||||
// processVideoPacket processes a video packet and writes samples to the broadcaster
|
||||
func processVideoPacket(pkt packets.Packet, state *streamState, videoBroadcaster *TrackBroadcaster, config models.Config) {
|
||||
if videoBroadcaster == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -735,35 +872,61 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
|
||||
|
||||
if state.lastVideoSample != nil {
|
||||
state.lastVideoSample.Duration = sampleDuration(pkt, state.lastVideoSample.PacketTimestamp, 33*time.Millisecond)
|
||||
|
||||
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
|
||||
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
|
||||
}
|
||||
videoBroadcaster.WriteSample(*state.lastVideoSample)
|
||||
}
|
||||
|
||||
state.lastVideoSample = &sample
|
||||
}
|
||||
|
||||
// processAudioPacket processes an audio packet and writes samples to the track
|
||||
func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pionWebRTC.TrackLocalStaticSample, hasAAC bool) {
|
||||
if audioTrack == nil {
|
||||
// processAudioPacket processes an audio packet and writes samples to the broadcaster.
|
||||
// When the packet carries AAC and a transcoder is provided, the audio is transcoded
|
||||
// to G.711 µ-law on the fly so it can be sent over a PCMU WebRTC track.
|
||||
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, transcoder *AACTranscoder) {
|
||||
if audioBroadcaster == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if hasAAC {
|
||||
// AAC transcoding not yet implemented
|
||||
// TODO: Implement AAC to PCM_MULAW transcoding
|
||||
return
|
||||
state.audioPacketsSeen++
|
||||
|
||||
audioData := pkt.Data
|
||||
|
||||
if pkt.Codec == "AAC" {
|
||||
state.aacPacketsSeen++
|
||||
if transcoder == nil {
|
||||
state.aacErrors++
|
||||
if state.aacErrors <= 3 || state.aacErrors%100 == 0 {
|
||||
log.Log.Warning(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet dropped because transcoder is nil (aac_packets=%d, input_bytes=%d)", state.aacPacketsSeen, len(pkt.Data)))
|
||||
}
|
||||
return // no transcoder – silently drop
|
||||
}
|
||||
pcmu, err := transcoder.Transcode(pkt.Data)
|
||||
if err != nil {
|
||||
state.aacErrors++
|
||||
log.Log.Error("webrtc.main.processAudioPacket(): AAC transcode error: " + err.Error())
|
||||
return
|
||||
}
|
||||
if len(pcmu) == 0 {
|
||||
state.aacNoOutput++
|
||||
if state.aacNoOutput <= 5 || state.aacNoOutput%100 == 0 {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet produced no PCMU output yet (aac_packets=%d, no_output=%d, input_bytes=%d)", state.aacPacketsSeen, state.aacNoOutput, len(pkt.Data)))
|
||||
}
|
||||
return // decoder still buffering
|
||||
}
|
||||
if state.aacPacketsSeen <= 5 || state.aacPacketsSeen%100 == 0 {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC transcoded to PCMU (aac_packets=%d, input_bytes=%d, output_bytes=%d, peers=%d)", state.aacPacketsSeen, len(pkt.Data), len(pcmu), audioBroadcaster.PeerCount()))
|
||||
}
|
||||
audioData = pcmu
|
||||
}
|
||||
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
|
||||
sample := pionMedia.Sample{Data: audioData, PacketTimestamp: sampleTimestamp(pkt)}
|
||||
|
||||
if state.lastAudioSample != nil {
|
||||
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
|
||||
|
||||
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
|
||||
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
|
||||
state.audioSamplesSent++
|
||||
if state.audioSamplesSent <= 5 || state.audioSamplesSent%100 == 0 {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): queueing audio sample (samples=%d, codec=%s, bytes=%d, duration_ms=%d, peers=%d)", state.audioSamplesSent, pkt.Codec, len(state.lastAudioSample.Data), state.lastAudioSample.Duration.Milliseconds(), audioBroadcaster.PeerCount()))
|
||||
}
|
||||
audioBroadcaster.WriteSample(*state.lastAudioSample)
|
||||
}
|
||||
|
||||
state.lastAudioSample = &sample
|
||||
@@ -778,13 +941,13 @@ func shouldDropPacketForLatency(pkt packets.Packet) bool {
|
||||
return age > maxLivePacketAge
|
||||
}
|
||||
|
||||
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
|
||||
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, rtspClient capture.RTSPClient) {
|
||||
|
||||
config := configuration.Config
|
||||
|
||||
// Check if at least one track is available
|
||||
if videoTrack == nil && audioTrack == nil {
|
||||
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio tracks are nil, cannot proceed")
|
||||
// Check if at least one broadcaster is available
|
||||
if videoBroadcaster == nil && audioBroadcaster == nil {
|
||||
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio broadcasters are nil, cannot proceed")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -796,8 +959,22 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
|
||||
return
|
||||
}
|
||||
|
||||
// Create AAC transcoder if needed (AAC → G.711 µ-law).
|
||||
var aacTranscoder *AACTranscoder
|
||||
if codecs.hasAAC && audioBroadcaster != nil {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): AAC audio detected, creating transcoder (audio_peers=%d)", audioBroadcaster.PeerCount()))
|
||||
t, err := NewAACTranscoder()
|
||||
if err != nil {
|
||||
log.Log.Error("webrtc.main.WriteToTrack(): failed to create AAC transcoder: " + err.Error())
|
||||
} else {
|
||||
aacTranscoder = t
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): AAC transcoder created successfully")
|
||||
defer aacTranscoder.Close()
|
||||
}
|
||||
}
|
||||
|
||||
if config.Capture.TranscodingWebRTC == "true" {
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): transcoding enabled but not yet implemented")
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): transcoding config enabled")
|
||||
}
|
||||
|
||||
// Initialize streaming state
|
||||
@@ -807,7 +984,13 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
|
||||
}
|
||||
|
||||
defer func() {
|
||||
writeFinalSamples(state, videoTrack, audioTrack)
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): audio summary packets=%d aac_packets=%d sent=%d aac_no_output=%d aac_errors=%d peers=%d", state.audioPacketsSeen, state.aacPacketsSeen, state.audioSamplesSent, state.aacNoOutput, state.aacErrors, func() int {
|
||||
if audioBroadcaster == nil {
|
||||
return 0
|
||||
}
|
||||
return audioBroadcaster.PeerCount()
|
||||
}()))
|
||||
writeFinalSamples(state, videoBroadcaster, audioBroadcaster)
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
|
||||
}()
|
||||
|
||||
@@ -873,9 +1056,9 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
|
||||
|
||||
// Process video or audio packets
|
||||
if pkt.IsVideo {
|
||||
processVideoPacket(pkt, state, videoTrack, config)
|
||||
processVideoPacket(pkt, state, videoBroadcaster, config)
|
||||
} else if pkt.IsAudio {
|
||||
processAudioPacket(pkt, state, audioTrack, codecs.hasAAC)
|
||||
processAudioPacket(pkt, state, audioBroadcaster, aacTranscoder)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user