Compare commits

...

16 Commits

Author SHA1 Message Date
Cédric Verstraeten
8657765e5d Merge pull request #262 from kerberos-io/feature/concurrency-webrtc
feature/concurrency-webrtc
2026-03-09 21:37:04 +01:00
Cédric Verstraeten
76a136abc9 Add trailing commas to fallback calls
Add trailing commas to the arguments passed to fallbackToSDLiveview in ui/src/pages/Dashboard/Dashboard.jsx. This is a non-functional formatting change applied to the WebRTC initialization, ICE candidate handling, and connection-state fallback calls to align with the project's code style/formatter.
2026-03-09 21:34:17 +01:00
Cédric Verstraeten
5475b79459 Remove extraneous trailing commas in Dashboard
Clean up trailing commas and minor formatting in ui/src/pages/Dashboard/Dashboard.jsx. Adjusts object/argument commas and formatting around WebRTC message handling, peer connection setup, error handling, and SD liveview fallback callbacks to avoid potential syntax/lint issues.
2026-03-09 21:32:10 +01:00
Cédric Verstraeten
2ad768780f Format Dashboard.jsx: add trailing commas
Apply consistent formatting to ui/src/pages/Dashboard/Dashboard.jsx by adding trailing commas in object literals, function call argument lists, and callbacks (primarily around WebRTC handling and error messages). This is a non-functional style change to match the project's code style (e.g., Prettier/ESLint) and should not affect runtime behavior.
2026-03-09 21:29:30 +01:00
Cédric Verstraeten
f64b5fb65b Replace uuidv4 with local createUUID
Remove the uuidv4 import and introduce a local createUUID helper that uses window.crypto.randomUUID when available and falls back to a v4-style generator. Update webrtcClientId and webrtcSessionId to use createUUID(), removing the external dependency while preserving UUID generation for WebRTC session/client IDs.
2026-03-09 21:27:01 +01:00
Cédric Verstraeten
bb773316a2 Add trailing commas and tidy Media.scss
Add trailing commas to multi-line function calls and RTCPeerConnection instantiation in Dashboard.jsx for consistent formatting. In Media.scss remove an extra blank line and relocate the .media-filters__field:first-child rule to consolidate related styles. Purely stylistic/organizational changes with no intended behavior change.
2026-03-09 21:23:16 +01:00
Cédric Verstraeten
fc6fa9d425 Format Media.jsx and add newline
Reformat code in ui/src/pages/Media/Media.jsx for readability: wrap long argument lists (getTimestampFromInput, buildEventFilter) and reflow the appliedFilter ternary onto multiple lines. Also add the missing trailing newline to ui/public/locales/en/translation.json. No functional changes.
2026-03-09 21:18:52 +01:00
Cédric Verstraeten
aa183ee0fb Enable MQTT persistent sessions and resume subs
Switch MQTT client to persistent sessions by setting CleanSession to false, enabling ResumeSubs and using an in-memory store. Previously CleanSession was true and resume/store were commented out, which could drop subscriptions on reconnect; these changes ensure subscriptions are preserved across reconnects and re-subscribed from memory.
2026-03-09 21:12:22 +01:00
Cédric Verstraeten
730b1b2a40 Add WebRTC liveview signaling and UI fallback
Introduce structured WebRTC handshake signaling and client-side fallbacks. Changes:

- machinery: replace HandleLiveHDHandshake channel to carry LiveHDHandshake (payload + signaling callbacks) and expose active WebRTC reader count in dashboard data.
- routers: MQTT and WebSocket handlers now send/receive LiveHDHandshake structs; websocket supports stream-hd and webrtc-candidate messages and uses callback-based signaling to reply over the WS connection.
- webrtc: add helper functions to send MQTT or callback answers/candidates, adapt InitializeWebRTCConnection to the new handshake type, and expose GetActivePeerConnectionCount.
- utils: minor GetMediaFormatted filtering fix and unit test for timestamp range behavior.
- ui: Dashboard gains native WebRTC liveview with fallback to SD image stream, shows active listener count, and handles signaling/candidates; Media page adds datetime range filters, infinite-scroll append behavior, and styles; reducer/action updates to support appending events; package.json scripts disable ESLint plugin during start/build/test.

These changes enable browser-based HD liveviews with dual signaling paths (websocket callbacks or MQTT), improve media filtering, and provide graceful fallback to SD streaming when WebRTC fails.
2026-03-09 21:10:18 +01:00
Cédric Verstraeten
4efc80fecb Enhance WebRTC signaling robustness
Increase HD handshake channel buffer and harden signaling flow: enlarge HandleLiveHDHandshake buffer from 10 to 100 and add a nil-check to drop and log requests when the channel is not initialized. Add publishSignalingMessageAsync to publish MQTT messages with timeout and error logging, and replace blocking Publish().Wait() calls for ICE candidates and SDP answers with the async publisher. Reintroduce the remote-candidate processor goroutine after remote description handling to avoid AddICECandidate races. These changes reduce blocking, improve error handling, and make WebRTC/MQTT signaling more resilient.
2026-03-09 20:05:00 +01:00
Cédric Verstraeten
4fbee60e9f Merge pull request #261 from kerberos-io/feature/add-webrtc-aac-transcoder
feature/add-webrtc-aac-transcoder
2026-03-09 17:46:17 +01:00
Cédric Verstraeten
d6c25df280 Add missing imports for strconv and strings in AAC transcoder stub 2026-03-09 16:42:42 +00:00
Cédric Verstraeten
72a2d28e1e Update aac_transcoder_stub.go 2026-03-09 17:41:54 +01:00
Cédric Verstraeten
eb0972084f Implement AAC transcoding for WebRTC using FFmpeg; update Dockerfiles and launch configuration 2026-03-09 16:34:52 +00:00
Cédric Verstraeten
41a1d221fc Merge pull request #260 from kerberos-io/fix/set-clean-state
fix/set-clean-state
2026-03-09 16:56:36 +01:00
Cédric Verstraeten
eaacc93d2f Set MQTT clean session to true and disable resume subscriptions 2026-03-09 15:50:40 +00:00
18 changed files with 1604 additions and 179 deletions

View File

@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

View File

@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

View File

@@ -21,6 +21,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/packets"
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
"github.com/tevino/abool"
)
@@ -303,7 +304,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
}
// Handle livestream HD (high resolution over WEBRTC)
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 10)
communication.HandleLiveHDHandshake = make(chan models.LiveHDHandshake, 100)
if subStreamEnabled {
livestreamHDCursor := subQueue.Latest()
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient)
@@ -552,6 +553,11 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
// The total number of recordings stored in the directory.
recordingDirectory := configDirectory + "/data/recordings"
numberOfRecordings := utils.NumberOfMP4sInDirectory(recordingDirectory)
activeWebRTCReaders := webrtc.GetActivePeerConnectionCount()
pendingWebRTCHandshakes := 0
if communication.HandleLiveHDHandshake != nil {
pendingWebRTCHandshakes = len(communication.HandleLiveHDHandshake)
}
// All days stored in this agent.
days := []string{}
@@ -574,6 +580,8 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
"cameraOnline": cameraIsOnline,
"cloudOnline": cloudIsOnline,
"numberOfRecordings": numberOfRecordings,
"webrtcReaders": activeWebRTCReaders,
"webrtcPending": pendingWebRTCHandshakes,
"days": days,
"latestEvents": latestEvents,
})

View File

@@ -8,6 +8,17 @@ import (
"github.com/tevino/abool"
)
type LiveHDSignalingCallbacks struct {
SendAnswer func(sessionID string, sdp string) error
SendCandidate func(sessionID string, candidate string) error
SendError func(sessionID string, message string) error
}
type LiveHDHandshake struct {
Payload RequestHDStreamPayload
Signaling *LiveHDSignalingCallbacks
}
// The communication struct that is managing
// all the communication between the different goroutines.
type Communication struct {
@@ -27,7 +38,7 @@ type Communication struct {
HandleHeartBeat chan string
HandleLiveSD chan int64
HandleLiveHDKeepalive chan string
HandleLiveHDHandshake chan RequestHDStreamPayload
HandleLiveHDHandshake chan LiveHDHandshake
HandleLiveHDPeers chan string
HandleONVIF chan OnvifAction
IsConfiguring *abool.AtomicBool

View File

@@ -90,6 +90,7 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
// Some extra options to make sure the connection behaves
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
//opts.SetCleanSession(true)
opts.SetCleanSession(false)
opts.SetResumeSubs(true)
opts.SetStore(mqtt.NewMemoryStore())
@@ -537,9 +538,13 @@ func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models
if communication.CameraConnected {
// Set the Hub key, so we can send back the answer.
requestHDStreamPayload.HubKey = hubKey
select {
case communication.HandleLiveHDHandshake <- requestHDStreamPayload:
default:
if communication.HandleLiveHDHandshake == nil {
log.Log.Error("routers.mqtt.main.HandleRequestHDStream(): handshake channel is nil, dropping request")
return
}
communication.HandleLiveHDHandshake <- models.LiveHDHandshake{
Payload: requestHDStreamPayload,
}
log.Log.Info("routers.mqtt.main.HandleRequestHDStream(): received request to setup webrtc.")
} else {

View File

@@ -6,6 +6,7 @@ import (
"image"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
@@ -14,6 +15,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/packets"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
)
type Message struct {
@@ -28,6 +30,23 @@ type Connection struct {
Cancels map[string]context.CancelFunc
}
func writeWebRTCError(connection *Connection, clientID string, sessionID string, errorMessage string) {
if connection == nil {
return
}
if err := connection.WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-error",
Message: map[string]string{
"session_id": sessionID,
"message": errorMessage,
},
}); err != nil {
log.Log.Error("routers.websocket.main.writeWebRTCError(): " + err.Error())
}
}
// Concurrency handling - sending messages
func (c *Connection) WriteJson(message Message) error {
c.mu.Lock()
@@ -115,6 +134,82 @@ func WebsocketHandler(c *gin.Context, configuration *models.Configuration, commu
go ForwardSDStream(ctx, clientID, sockets[clientID], configuration, communication, captureDevice)
}
}
case "stream-hd":
sessionID := message.Message["session_id"]
sessionDescription := message.Message["sdp"]
if sessionID == "" || sessionDescription == "" {
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or sdp")
break
}
if !communication.CameraConnected {
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
break
}
if communication.HandleLiveHDHandshake == nil {
writeWebRTCError(sockets[clientID], clientID, sessionID, "webrtc liveview is not available")
break
}
handshake := models.LiveHDHandshake{
Payload: models.RequestHDStreamPayload{
Timestamp: time.Now().Unix(),
SessionID: sessionID,
SessionDescription: sessionDescription,
},
Signaling: &models.LiveHDSignalingCallbacks{
SendAnswer: func(callbackSessionID string, sdp string) error {
return sockets[clientID].WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-answer",
Message: map[string]string{
"session_id": callbackSessionID,
"sdp": sdp,
},
})
},
SendCandidate: func(callbackSessionID string, candidate string) error {
return sockets[clientID].WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-candidate",
Message: map[string]string{
"session_id": callbackSessionID,
"candidate": candidate,
},
})
},
SendError: func(callbackSessionID string, errorMessage string) error {
writeWebRTCError(sockets[clientID], clientID, callbackSessionID, errorMessage)
return nil
},
},
}
communication.HandleLiveHDHandshake <- handshake
case "webrtc-candidate":
sessionID := message.Message["session_id"]
candidate := message.Message["candidate"]
if sessionID == "" || candidate == "" {
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or candidate")
break
}
if !communication.CameraConnected {
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
break
}
key := configuration.Config.Key + "/" + sessionID
go webrtc.RegisterCandidates(key, models.ReceiveHDCandidatesPayload{
Timestamp: time.Now().Unix(),
SessionID: sessionID,
Candidate: candidate,
})
}
err = conn.ReadJSON(&message)

View File

@@ -27,7 +27,8 @@ import (
// VERSION is the agent version. It defaults to "0.0.0" for local dev builds
// and is overridden at build time via:
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
//
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
var VERSION = "0.0.0"
const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@@ -198,6 +199,13 @@ func GetMediaFormatted(files []os.FileInfo, recordingDirectory string, configura
timestampInt, err := strconv.ParseInt(timestamp, 10, 64)
if err == nil {
if eventFilter.TimestampOffsetStart > 0 {
// TimestampOffsetStart represents the newest lower bound to include.
if timestampInt < eventFilter.TimestampOffsetStart {
continue
}
}
// If we have an offset we will check if we should skip or not
if eventFilter.TimestampOffsetEnd > 0 {
// Medias are sorted from new to older. TimestampOffsetEnd holds the oldest

View File

@@ -0,0 +1,54 @@
package utils
import (
"os"
"testing"
"time"
"github.com/kerberos-io/agent/machinery/src/models"
)
type stubFileInfo struct {
name string
}
func (s stubFileInfo) Name() string { return s.name }
func (s stubFileInfo) Size() int64 { return 0 }
func (s stubFileInfo) Mode() os.FileMode { return 0 }
func (s stubFileInfo) ModTime() time.Time { return time.Unix(0, 0) }
func (s stubFileInfo) IsDir() bool { return false }
func (s stubFileInfo) Sys() interface{} { return nil }
func TestGetMediaFormattedHonorsTimestampRange(t *testing.T) {
configuration := &models.Configuration{}
configuration.Config.Timezone = "UTC"
configuration.Config.Name = "Front Door"
configuration.Config.Key = "camera-1"
files := []os.FileInfo{
stubFileInfo{name: "1700000200_6_7_8_9_10.mp4"},
stubFileInfo{name: "1700000100_6_7_8_9_10.mp4"},
stubFileInfo{name: "1700000000_6_7_8_9_10.mp4"},
}
media := GetMediaFormatted(files, "/tmp/recordings", configuration, models.EventFilter{
TimestampOffsetStart: 1700000050,
TimestampOffsetEnd: 1700000200,
NumberOfElements: 10,
})
if len(media) != 1 {
t.Fatalf("expected 1 media item in time range, got %d", len(media))
}
if media[0].Timestamp != "1700000100" {
t.Fatalf("expected timestamp 1700000100, got %s", media[0].Timestamp)
}
if media[0].CameraName != "Front Door" {
t.Fatalf("expected camera name to be preserved, got %s", media[0].CameraName)
}
if media[0].CameraKey != "camera-1" {
t.Fatalf("expected camera key to be preserved, got %s", media[0].CameraKey)
}
}

View File

@@ -0,0 +1,270 @@
// AAC to G.711 µ-law transcoder using FFmpeg (libavcodec + libswresample).
// Build with: go build -tags ffmpeg ...
//
// Requires: libavcodec-dev, libavutil-dev, libswresample-dev (FFmpeg ≥ 5.x)
// and an AAC decoder compiled into the FFmpeg build (usually the default).
//
//go:build ffmpeg
package webrtc
/*
#cgo pkg-config: libavcodec libavutil libswresample
#cgo CFLAGS: -Wno-deprecated-declarations
#include <libavcodec/avcodec.h>
#include <libavutil/channel_layout.h>
#include <libavutil/frame.h>
#include <libavutil/mem.h>
#include <libavutil/opt.h>
#include <libswresample/swresample.h>
#include <stdlib.h>
#include <string.h>
// ── Transcoder handle ───────────────────────────────────────────────────
typedef struct {
AVCodecContext *codec_ctx;
AVCodecParserContext *parser;
SwrContext *swr_ctx;
AVFrame *frame;
AVPacket *pkt;
int swr_initialized;
int in_sample_rate;
int in_channels;
} aac_transcoder_t;
// ── Create / Destroy ────────────────────────────────────────────────────
static aac_transcoder_t* aac_transcoder_create(void) {
const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC);
if (!codec) return NULL;
aac_transcoder_t *t = (aac_transcoder_t*)calloc(1, sizeof(aac_transcoder_t));
if (!t) return NULL;
t->codec_ctx = avcodec_alloc_context3(codec);
if (!t->codec_ctx) { free(t); return NULL; }
if (avcodec_open2(t->codec_ctx, codec, NULL) < 0) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->parser = av_parser_init(AV_CODEC_ID_AAC);
if (!t->parser) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->frame = av_frame_alloc();
t->pkt = av_packet_alloc();
if (!t->frame || !t->pkt) {
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
av_parser_close(t->parser);
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
return t;
}
static void aac_transcoder_destroy(aac_transcoder_t *t) {
if (!t) return;
if (t->swr_ctx) swr_free(&t->swr_ctx);
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
if (t->parser) av_parser_close(t->parser);
if (t->codec_ctx) avcodec_free_context(&t->codec_ctx);
free(t);
}
// ── Lazy resampler init (called after the first decoded frame) ──────────
static int aac_init_swr(aac_transcoder_t *t) {
int64_t in_ch_layout = (int64_t)t->codec_ctx->channel_layout;
if (in_ch_layout == 0)
in_ch_layout = av_get_default_channel_layout(t->codec_ctx->channels);
t->swr_ctx = swr_alloc_set_opts(
NULL,
AV_CH_LAYOUT_MONO, // out: mono
AV_SAMPLE_FMT_S16, // out: signed 16-bit
8000, // out: 8 kHz
in_ch_layout, // in: from decoder
t->codec_ctx->sample_fmt, // in: from decoder
t->codec_ctx->sample_rate, // in: from decoder
0, NULL);
if (!t->swr_ctx) return -1;
if (swr_init(t->swr_ctx) < 0) {
swr_free(&t->swr_ctx);
return -1;
}
t->in_sample_rate = t->codec_ctx->sample_rate;
t->in_channels = t->codec_ctx->channels;
t->swr_initialized = 1;
return 0;
}
// ── Transcode ADTS → 8 kHz mono S16 PCM ────────────────────────────────
// Caller must free *out_pcm with av_free() when non-NULL.
static int aac_transcode_to_pcm(aac_transcoder_t *t,
const uint8_t *data, int data_size,
uint8_t **out_pcm, int *out_size) {
*out_pcm = NULL;
*out_size = 0;
if (!data || data_size <= 0) return 0;
int buf_cap = 8192;
uint8_t *buf = (uint8_t*)av_malloc(buf_cap);
if (!buf) return -1;
int buf_len = 0;
while (data_size > 0) {
uint8_t *pout = NULL;
int pout_size = 0;
int used = av_parser_parse2(t->parser, t->codec_ctx,
&pout, &pout_size,
data, data_size,
AV_NOPTS_VALUE, AV_NOPTS_VALUE, 0);
if (used < 0) break;
data += used;
data_size -= used;
if (pout_size == 0) continue;
// Feed parsed frame to decoder
t->pkt->data = pout;
t->pkt->size = pout_size;
if (avcodec_send_packet(t->codec_ctx, t->pkt) < 0) continue;
// Pull all decoded frames
while (avcodec_receive_frame(t->codec_ctx, t->frame) == 0) {
if (!t->swr_initialized) {
if (aac_init_swr(t) < 0) {
av_frame_unref(t->frame);
av_free(buf);
return -1;
}
}
int out_samples = swr_get_out_samples(t->swr_ctx,
t->frame->nb_samples);
if (out_samples <= 0) out_samples = t->frame->nb_samples;
int needed = buf_len + out_samples * 2; // S16 = 2 bytes/sample
if (needed > buf_cap) {
buf_cap = needed * 2;
uint8_t *tmp = (uint8_t*)av_realloc(buf, buf_cap);
if (!tmp) { av_frame_unref(t->frame); av_free(buf); return -1; }
buf = tmp;
}
uint8_t *dst = buf + buf_len;
int converted = swr_convert(t->swr_ctx,
&dst, out_samples,
(const uint8_t**)t->frame->extended_data,
t->frame->nb_samples);
if (converted > 0)
buf_len += converted * 2;
av_frame_unref(t->frame);
}
}
if (buf_len == 0) {
av_free(buf);
return 0;
}
*out_pcm = buf;
*out_size = buf_len;
return 0;
}
*/
import "C"
import (
"errors"
"fmt"
"unsafe"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/zaf/g711"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is compiled in (requires the "ffmpeg" build tag).
func AACTranscodingAvailable() bool { return true }
// AACTranscoder decodes ADTS-wrapped AAC audio to 8 kHz mono PCM
// and encodes it as G.711 µ-law for WebRTC transport.
type AACTranscoder struct {
handle *C.aac_transcoder_t
}
// NewAACTranscoder creates a transcoder backed by FFmpeg's AAC decoder.
func NewAACTranscoder() (*AACTranscoder, error) {
h := C.aac_transcoder_create()
if h == nil {
return nil, errors.New("failed to create AAC transcoder (FFmpeg AAC decoder not available?)")
}
log.Log.Info("webrtc.aac_transcoder: AAC → G.711 µ-law transcoder initialised (FFmpeg)")
return &AACTranscoder{handle: h}, nil
}
// Transcode converts an ADTS buffer (one or more AAC frames) into
// G.711 µ-law encoded audio suitable for a PCMU WebRTC track.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || t.handle == nil || len(adtsData) == 0 {
return nil, nil
}
var outPCM *C.uint8_t
var outSize C.int
ret := C.aac_transcode_to_pcm(
t.handle,
(*C.uint8_t)(unsafe.Pointer(&adtsData[0])),
C.int(len(adtsData)),
&outPCM, &outSize,
)
if ret < 0 {
return nil, errors.New("AAC decode/resample failed")
}
if outSize == 0 || outPCM == nil {
return nil, nil // decoder buffering, no output yet
}
defer C.av_free(unsafe.Pointer(outPCM))
// Copy S16LE PCM to Go slice, then encode to µ-law.
pcm := C.GoBytes(unsafe.Pointer(outPCM), outSize)
ulaw := g711.EncodeUlaw(pcm)
// Log resampler details once.
if t.handle.swr_initialized == 1 && t.handle.in_sample_rate != 0 {
log.Log.Info(fmt.Sprintf(
"webrtc.aac_transcoder: first output resampling %d Hz / %d ch → 8000 Hz mono → µ-law",
int(t.handle.in_sample_rate), int(t.handle.in_channels)))
// Prevent repeated logging by zeroing the field we check.
t.handle.in_sample_rate = 0
}
return ulaw, nil
}
// Close releases all FFmpeg resources held by the transcoder.
func (t *AACTranscoder) Close() {
if t != nil && t.handle != nil {
C.aac_transcoder_destroy(t.handle)
t.handle = nil
log.Log.Info("webrtc.aac_transcoder: transcoder closed")
}
}

View File

@@ -0,0 +1,205 @@
// AAC transcoding fallback that uses the ffmpeg binary at runtime.
// Build with -tags ffmpeg to use the in-process CGO implementation instead.
//
//go:build !ffmpeg
package webrtc
import (
"bytes"
"errors"
"io"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/kerberos-io/agent/machinery/src/log"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is available in the current runtime.
func AACTranscodingAvailable() bool {
_, err := exec.LookPath("ffmpeg")
return err == nil
}
// AACTranscoder uses an ffmpeg subprocess to convert ADTS AAC to raw PCMU.
type AACTranscoder struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
stderrBuf bytes.Buffer
mu sync.Mutex
outMu sync.Mutex
outBuf bytes.Buffer
closed bool
closeOnce sync.Once
}
// NewAACTranscoder creates a runtime ffmpeg-based transcoder.
func NewAACTranscoder() (*AACTranscoder, error) {
ffmpegPath, err := exec.LookPath("ffmpeg")
if err != nil {
return nil, errors.New("AAC transcoding not available: ffmpeg binary not found in PATH")
}
log.Log.Info("webrtc.aac_transcoder: using ffmpeg binary at " + ffmpegPath)
cmd := exec.Command(
ffmpegPath,
"-hide_banner",
"-loglevel", "error",
"-fflags", "+nobuffer",
"-flags", "low_delay",
"-f", "aac",
"-i", "pipe:0",
"-vn",
"-ac", "1",
"-ar", "8000",
"-acodec", "pcm_mulaw",
"-f", "mulaw",
"pipe:1",
)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmd.Stderr = &bytes.Buffer{}
if err := cmd.Start(); err != nil {
return nil, err
}
t := &AACTranscoder{
cmd: cmd,
stdin: stdin,
stdout: stdout,
}
if stderrBuf, ok := cmd.Stderr.(*bytes.Buffer); ok {
t.stderrBuf = *stderrBuf
}
go func() {
buf := make([]byte, 4096)
for {
n, readErr := stdout.Read(buf)
if n > 0 {
t.outMu.Lock()
_, _ = t.outBuf.Write(buf[:n])
buffered := t.outBuf.Len()
t.outMu.Unlock()
if buffered <= 8192 || buffered%16000 == 0 {
log.Log.Info("webrtc.aac_transcoder: ffmpeg produced PCMU bytes, buffered=" + strconv.Itoa(buffered))
}
}
if readErr != nil {
if readErr != io.EOF {
log.Log.Warning("webrtc.aac_transcoder: stdout reader stopped: " + readErr.Error())
}
return
}
}
}()
log.Log.Info("webrtc.aac_transcoder: AAC → PCMU transcoder initialised (ffmpeg process)")
return t, nil
}
// Transcode writes ADTS AAC to ffmpeg and returns any PCMU bytes produced.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || len(adtsData) == 0 {
return nil, nil
}
t.mu.Lock()
defer t.mu.Unlock()
if t.closed {
return nil, errors.New("AAC transcoder is closed")
}
if _, err := t.stdin.Write(adtsData); err != nil {
return nil, err
}
if len(adtsData) <= 512 || len(adtsData)%1024 == 0 {
log.Log.Info("webrtc.aac_transcoder: wrote AAC bytes to ffmpeg, input=" + strconv.Itoa(len(adtsData)))
}
deadline := time.Now().Add(75 * time.Millisecond)
for {
data := t.readAvailable()
if len(data) > 0 {
log.Log.Info("webrtc.aac_transcoder: returning PCMU bytes=" + strconv.Itoa(len(data)))
return data, nil
}
if time.Now().After(deadline) {
if stderr := t.stderrString(); stderr != "" {
log.Log.Warning("webrtc.aac_transcoder: no output before deadline, ffmpeg stderr: " + stderr)
} else {
log.Log.Info("webrtc.aac_transcoder: no PCMU output before deadline")
}
return nil, nil
}
time.Sleep(5 * time.Millisecond)
}
}
func (t *AACTranscoder) readAvailable() []byte {
t.outMu.Lock()
defer t.outMu.Unlock()
if t.outBuf.Len() == 0 {
return nil
}
out := make([]byte, t.outBuf.Len())
copy(out, t.outBuf.Bytes())
t.outBuf.Reset()
return out
}
func (t *AACTranscoder) stderrString() string {
if t == nil {
return ""
}
if stderrBuf, ok := t.cmd.Stderr.(*bytes.Buffer); ok {
return strings.TrimSpace(stderrBuf.String())
}
return strings.TrimSpace(t.stderrBuf.String())
}
// Close stops the ffmpeg subprocess.
func (t *AACTranscoder) Close() {
if t == nil {
return
}
t.closeOnce.Do(func() {
t.mu.Lock()
t.closed = true
if t.stdin != nil {
_ = t.stdin.Close()
}
t.mu.Unlock()
if t.stdout != nil {
_ = t.stdout.Close()
}
if t.cmd != nil {
_ = t.cmd.Process.Kill()
_, _ = t.cmd.Process.Wait()
if stderr := t.stderrString(); stderr != "" {
log.Log.Info("webrtc.aac_transcoder: ffmpeg stderr on close: " + stderr)
}
}
})
}

View File

@@ -4,13 +4,14 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
//"github.com/izern/go-fdkaac/fdkaac"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
@@ -139,6 +140,11 @@ func (cm *ConnectionManager) GetPeerConnectionCount() int64 {
return atomic.LoadInt64(&cm.peerConnectionCount)
}
// GetActivePeerConnectionCount returns the current number of connected WebRTC readers.
func GetActivePeerConnectionCount() int64 {
return globalConnectionManager.GetPeerConnectionCount()
}
// IncrementPeerCount atomically increments the peer connection count
func (cm *ConnectionManager) IncrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, 1)
@@ -251,7 +257,79 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
return nil
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) {
func publishSignalingMessageAsync(mqttClient mqtt.Client, topic string, payload []byte, description string) {
if mqttClient == nil {
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): mqtt client is nil for " + description)
return
}
token := mqttClient.Publish(topic, 2, false, payload)
go func() {
if !token.WaitTimeout(5 * time.Second) {
log.Log.Warning("webrtc.main.publishSignalingMessageAsync(): timed out publishing " + description)
return
}
if err := token.Error(); err != nil {
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): failed publishing " + description + ": " + err.Error())
}
}()
}
func sendCandidateSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, candidateJSON []byte) {
if handshake.Signaling != nil && handshake.Signaling.SendCandidate != nil {
if err := handshake.Signaling.SendCandidate(handshake.Payload.SessionID, string(candidateJSON)); err != nil {
log.Log.Error("webrtc.main.sendCandidateSignal(): " + err.Error())
}
return
}
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-candidates",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"candidate": string(candidateJSON),
"session_id": handshake.Payload.SessionID,
},
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "ICE candidate for session "+handshake.Payload.SessionID)
} else {
log.Log.Info("webrtc.main.sendCandidateSignal(): while packaging mqtt message: " + err.Error())
}
}
func sendAnswerSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, answer pionWebRTC.SessionDescription) {
encodedAnswer := base64.StdEncoding.EncodeToString([]byte(answer.SDP))
if handshake.Signaling != nil && handshake.Signaling.SendAnswer != nil {
if err := handshake.Signaling.SendAnswer(handshake.Payload.SessionID, encodedAnswer); err != nil {
log.Log.Error("webrtc.main.sendAnswerSignal(): " + err.Error())
}
return
}
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"sdp": []byte(encodedAnswer),
"session_id": handshake.Payload.SessionID,
},
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "SDP answer for session "+handshake.Payload.SessionID)
} else {
log.Log.Info("webrtc.main.sendAnswerSignal(): while packaging mqtt message: " + err.Error())
}
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.LiveHDHandshake) {
config := configuration.Config
deviceKey := config.Key
@@ -259,14 +337,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
turnServers := []string{config.TURNURI}
turnServersUsername := config.TURNUsername
turnServersCredential := config.TURNPassword
handshakePayload := handshake.Payload
// We create a channel which will hold the candidates for this session.
sessionKey := config.Key + "/" + handshake.SessionID
sessionKey := config.Key + "/" + handshakePayload.SessionID
candidateChannel := globalConnectionManager.GetOrCreateCandidateChannel(sessionKey)
// Set variables
hubKey := handshake.HubKey
sessionDescription := handshake.SessionDescription
hubKey := handshakePayload.HubKey
sessionDescription := handshakePayload.SessionDescription
// Create WebRTC object
w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential)
@@ -423,12 +502,12 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Log ICE connection state changes for diagnostics
peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() +
" (session: " + handshake.SessionID + ")")
" (session: " + handshakePayload.SessionID + ")")
})
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() +
" (session: " + handshake.SessionID + ")")
" (session: " + handshakePayload.SessionID + ")")
switch connectionState {
case pionWebRTC.PeerConnectionStateDisconnected:
@@ -437,9 +516,9 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer == nil {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " +
disconnectGracePeriod.String() + " for recovery (session: " + handshake.SessionID + ")")
disconnectGracePeriod.String() + " for recovery (session: " + handshakePayload.SessionID + ")")
wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshake.SessionID + ")")
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshakePayload.SessionID + ")")
cleanupPeerConnection(sessionKey, wrapper)
})
}
@@ -471,7 +550,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshake.SessionID + ")")
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshakePayload.SessionID + ")")
}
wrapper.disconnectMu.Unlock()
@@ -482,33 +561,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
})
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID)
}()
// Iterate over the candidates and send them to the remote client
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate.
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
@@ -556,27 +608,13 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
candateBinary, err := json.Marshal(candidateJSON)
if err == nil {
valueMap["candidate"] = string(candateBinary)
valueMap["session_id"] = handshake.SessionID
valueMap["session_id"] = handshakePayload.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
} else {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to marshal candidate: " + err.Error())
}
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-candidates",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
sendCandidateSignal(configuration, mqttClient, hubKey, handshake, candateBinary)
})
offer := w.CreateOffer(sd)
@@ -586,6 +624,35 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
return
}
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshakePayload.SessionID)
}()
// Process remote candidates only after the remote description is set.
// MQTT can deliver candidates before the SDP offer handling completes,
// and Pion rejects AddICECandidate calls until SetRemoteDescription succeeds.
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
@@ -600,27 +667,9 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Store peer connection in manager
globalConnectionManager.AddPeerConnection(sessionKey, wrapper)
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
sendAnswerSignal(configuration, mqttClient, hubKey, handshake, answer)
}
} else {
globalConnectionManager.CloseCandidateChannel(sessionKey)
@@ -640,7 +689,12 @@ func NewVideoBroadcaster(streams []packets.Stream) *TrackBroadcaster {
}
func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
var audioCodecNames []string
hasAAC := false
for _, s := range streams {
if s.IsAudio {
audioCodecNames = append(audioCodecNames, s.Name)
}
switch s.Name {
case "OPUS":
return NewTrackBroadcaster(pionWebRTC.MimeTypeOpus, "audio", trackStreamID)
@@ -648,9 +702,18 @@ func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
case "PCM_ALAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMA, "audio", trackStreamID)
case "AAC":
hasAAC = true
}
}
log.Log.Error("webrtc.main.NewAudioBroadcaster(): no supported audio codec found")
if hasAAC {
log.Log.Info("webrtc.main.NewAudioBroadcaster(): AAC detected, creating PCMU audio track for transcoded output")
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
} else if len(audioCodecNames) > 0 {
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioBroadcaster(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
} else {
log.Log.Info("webrtc.main.NewAudioBroadcaster(): no audio stream found in camera feed")
}
return nil
}
@@ -666,18 +729,33 @@ func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample
func NewAudioTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
var mimeType string
var audioCodecNames []string
hasAAC := false
for _, stream := range streams {
if stream.IsAudio {
audioCodecNames = append(audioCodecNames, stream.Name)
}
if stream.Name == "OPUS" {
mimeType = pionWebRTC.MimeTypeOpus
} else if stream.Name == "PCM_MULAW" {
mimeType = pionWebRTC.MimeTypePCMU
} else if stream.Name == "PCM_ALAW" {
mimeType = pionWebRTC.MimeTypePCMA
} else if stream.Name == "AAC" {
hasAAC = true
}
}
if mimeType == "" {
log.Log.Error("webrtc.main.NewAudioTrack(): no supported audio codec found")
return nil
if hasAAC {
mimeType = pionWebRTC.MimeTypePCMU
log.Log.Info("webrtc.main.NewAudioTrack(): AAC detected, creating PCMU audio track for transcoded output")
} else if len(audioCodecNames) > 0 {
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioTrack(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
return nil
} else {
log.Log.Info("webrtc.main.NewAudioTrack(): no audio stream found in camera feed")
return nil
}
}
outboundAudioTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", trackStreamID)
if err != nil {
@@ -696,6 +774,11 @@ type streamState struct {
receivedKeyFrame bool
lastAudioSample *pionMedia.Sample
lastVideoSample *pionMedia.Sample
audioPacketsSeen int64
aacPacketsSeen int64
audioSamplesSent int64
aacNoOutput int64
aacErrors int64
}
// codecSupport tracks which codecs are available in the stream
@@ -843,22 +926,54 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoBroadcaster
state.lastVideoSample = &sample
}
// processAudioPacket processes an audio packet and writes samples to the broadcaster
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, hasAAC bool) {
// processAudioPacket processes an audio packet and writes samples to the broadcaster.
// When the packet carries AAC and a transcoder is provided, the audio is transcoded
// to G.711 µ-law on the fly so it can be sent over a PCMU WebRTC track.
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, transcoder *AACTranscoder) {
if audioBroadcaster == nil {
return
}
if hasAAC {
// AAC transcoding not yet implemented
// TODO: Implement AAC to PCM_MULAW transcoding
return
state.audioPacketsSeen++
audioData := pkt.Data
if pkt.Codec == "AAC" {
state.aacPacketsSeen++
if transcoder == nil {
state.aacErrors++
if state.aacErrors <= 3 || state.aacErrors%100 == 0 {
log.Log.Warning(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet dropped because transcoder is nil (aac_packets=%d, input_bytes=%d)", state.aacPacketsSeen, len(pkt.Data)))
}
return // no transcoder silently drop
}
pcmu, err := transcoder.Transcode(pkt.Data)
if err != nil {
state.aacErrors++
log.Log.Error("webrtc.main.processAudioPacket(): AAC transcode error: " + err.Error())
return
}
if len(pcmu) == 0 {
state.aacNoOutput++
if state.aacNoOutput <= 5 || state.aacNoOutput%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet produced no PCMU output yet (aac_packets=%d, no_output=%d, input_bytes=%d)", state.aacPacketsSeen, state.aacNoOutput, len(pkt.Data)))
}
return // decoder still buffering
}
if state.aacPacketsSeen <= 5 || state.aacPacketsSeen%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC transcoded to PCMU (aac_packets=%d, input_bytes=%d, output_bytes=%d, peers=%d)", state.aacPacketsSeen, len(pkt.Data), len(pcmu), audioBroadcaster.PeerCount()))
}
audioData = pcmu
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
sample := pionMedia.Sample{Data: audioData, PacketTimestamp: sampleTimestamp(pkt)}
if state.lastAudioSample != nil {
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
state.audioSamplesSent++
if state.audioSamplesSent <= 5 || state.audioSamplesSent%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): queueing audio sample (samples=%d, codec=%s, bytes=%d, duration_ms=%d, peers=%d)", state.audioSamplesSent, pkt.Codec, len(state.lastAudioSample.Data), state.lastAudioSample.Duration.Milliseconds(), audioBroadcaster.PeerCount()))
}
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
@@ -892,8 +1007,22 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
return
}
// Create AAC transcoder if needed (AAC → G.711 µ-law).
var aacTranscoder *AACTranscoder
if codecs.hasAAC && audioBroadcaster != nil {
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): AAC audio detected, creating transcoder (audio_peers=%d)", audioBroadcaster.PeerCount()))
t, err := NewAACTranscoder()
if err != nil {
log.Log.Error("webrtc.main.WriteToTrack(): failed to create AAC transcoder: " + err.Error())
} else {
aacTranscoder = t
log.Log.Info("webrtc.main.WriteToTrack(): AAC transcoder created successfully")
defer aacTranscoder.Close()
}
}
if config.Capture.TranscodingWebRTC == "true" {
log.Log.Info("webrtc.main.WriteToTrack(): transcoding enabled but not yet implemented")
log.Log.Info("webrtc.main.WriteToTrack(): transcoding config enabled")
}
// Initialize streaming state
@@ -903,6 +1032,12 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
}
defer func() {
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): audio summary packets=%d aac_packets=%d sent=%d aac_no_output=%d aac_errors=%d peers=%d", state.audioPacketsSeen, state.aacPacketsSeen, state.audioSamplesSent, state.aacNoOutput, state.aacErrors, func() int {
if audioBroadcaster == nil {
return 0
}
return audioBroadcaster.PeerCount()
}()))
writeFinalSamples(state, videoBroadcaster, audioBroadcaster)
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
}()
@@ -971,7 +1106,7 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
if pkt.IsVideo {
processVideoPacket(pkt, state, videoBroadcaster, config)
} else if pkt.IsAudio {
processAudioPacket(pkt, state, audioBroadcaster, codecs.hasAAC)
processAudioPacket(pkt, state, audioBroadcaster, aacTranscoder)
}
}
}

View File

@@ -45,9 +45,9 @@
"crypto": false
},
"scripts": {
"start": "react-scripts start",
"build": "GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
"test": "react-scripts test",
"start": "DISABLE_ESLINT_PLUGIN=true react-scripts start",
"build": "DISABLE_ESLINT_PLUGIN=true GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
"test": "DISABLE_ESLINT_PLUGIN=true react-scripts test",
"eject": "react-scripts eject",
"lint": "eslint --debug 'src/**/*.{js,jsx,ts,tsx}'",
"format": "prettier --write \"**/*.{js,jsx,json,md}\""

View File

@@ -237,4 +237,4 @@
"remove_after_upload_enabled": "Enable delete on upload"
}
}
}
}

View File

@@ -194,7 +194,7 @@ export const getDashboardInformation = (onSuccess, onError) => {
};
};
export const getEvents = (eventfilter, onSuccess, onError) => {
export const getEvents = (eventfilter, onSuccess, onError, append = false) => {
return (dispatch) => {
doGetEvents(
eventfilter,
@@ -203,6 +203,7 @@ export const getEvents = (eventfilter, onSuccess, onError) => {
type: 'GET_EVENTS',
events: data.events,
filter: eventfilter,
append,
});
if (onSuccess) {
onSuccess();

View File

@@ -26,6 +26,23 @@ import {
import './Dashboard.scss';
import ReactTooltip from 'react-tooltip';
import config from '../../config';
import { getConfig } from '../../actions/agent';
function createUUID() {
if (
typeof window !== 'undefined' &&
window.crypto &&
typeof window.crypto.randomUUID === 'function'
) {
return window.crypto.randomUUID();
}
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (char) => {
const random = Math.floor(Math.random() * 16);
const value = char === 'x' ? random : 8 + Math.floor(random / 4);
return value.toString(16);
});
}
// eslint-disable-next-line react/prefer-stateless-function
class Dashboard extends React.Component {
@@ -33,48 +50,55 @@ class Dashboard extends React.Component {
super();
this.state = {
liveviewLoaded: false,
liveviewMode: 'webrtc',
open: false,
currentRecording: '',
initialised: false,
};
this.videoRef = React.createRef();
this.pendingRemoteCandidates = [];
this.initialiseLiveview = this.initialiseLiveview.bind(this);
this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this);
this.initialiseSDLiveview = this.initialiseSDLiveview.bind(this);
this.startWebRTCLiveview = this.startWebRTCLiveview.bind(this);
this.handleWebRTCSignalMessage = this.handleWebRTCSignalMessage.bind(this);
this.fallbackToSDLiveview = this.fallbackToSDLiveview.bind(this);
}
componentDidMount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
[this.liveviewElement] = liveview;
this.liveviewElement.addEventListener('load', this.handleLiveviewLoad);
}
const { dispatchGetConfig } = this.props;
dispatchGetConfig(() => this.initialiseLiveview());
this.initialiseLiveview();
}
componentDidUpdate() {
this.initialiseLiveview();
componentDidUpdate(prevProps) {
const { images, dashboard } = this.props;
const { liveviewLoaded, liveviewMode } = this.state;
const configLoaded = this.hasAgentConfig(this.props);
const prevConfigLoaded = this.hasAgentConfig(prevProps);
if (!prevConfigLoaded && configLoaded) {
this.initialiseLiveview();
}
if (
liveviewMode === 'sd' &&
!liveviewLoaded &&
prevProps.images !== images &&
images.length > 0
) {
this.setState({
liveviewLoaded: true,
});
}
if (!prevProps.dashboard.cameraOnline && dashboard.cameraOnline) {
this.initialiseLiveview();
}
}
componentWillUnmount() {
if (this.liveviewElement) {
this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad);
this.liveviewElement = null;
}
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {
message_type: 'stop-sd',
};
dispatchSend(message);
}
handleLiveviewLoad() {
this.setState({
liveviewLoaded: true,
});
this.stopSDLiveview();
this.stopWebRTCLiveview();
}
handleClose() {
@@ -84,32 +108,378 @@ class Dashboard extends React.Component {
});
}
getCurrentTimestamp() {
return Math.round(Date.now() / 1000);
// eslint-disable-next-line react/sort-comp
hasAgentConfig(props) {
const currentProps = props || this.props;
const { config: configResponse } = currentProps;
return !!(configResponse && configResponse.config);
}
browserSupportsWebRTC() {
return (
typeof window !== 'undefined' &&
typeof window.RTCPeerConnection !== 'undefined'
);
}
buildPeerConnectionConfig() {
const { config: configResponse } = this.props;
const agentConfig =
configResponse && configResponse.config ? configResponse.config : {};
const iceServers = [];
if (agentConfig.stunuri) {
iceServers.push({
urls: [agentConfig.stunuri],
});
}
if (agentConfig.turnuri) {
const turnServer = {
urls: [agentConfig.turnuri],
};
if (agentConfig.turn_username) {
turnServer.username = agentConfig.turn_username;
}
if (agentConfig.turn_password) {
turnServer.credential = agentConfig.turn_password;
}
iceServers.push(turnServer);
}
return {
iceServers,
iceTransportPolicy: agentConfig.turn_force === 'true' ? 'relay' : 'all',
};
}
initialiseLiveview() {
const { initialised } = this.state;
if (!initialised) {
const message = {
message_type: 'stream-sd',
};
const { connected, dispatchSend } = this.props;
if (connected) {
const { dashboard } = this.props;
if (initialised || !dashboard.cameraOnline) {
return;
}
if (!this.hasAgentConfig()) {
return;
}
if (this.browserSupportsWebRTC()) {
this.startWebRTCLiveview();
} else {
this.fallbackToSDLiveview('WebRTC is not supported in this browser.');
}
}
initialiseSDLiveview() {
if (this.requestStreamSubscription) {
return;
}
const message = {
message_type: 'stream-sd',
};
const { connected, dispatchSend } = this.props;
if (connected) {
dispatchSend(message);
}
const requestStreamInterval = interval(2000);
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
const { connected: isConnected } = this.props;
if (isConnected) {
dispatchSend(message);
}
});
}
const requestStreamInterval = interval(2000);
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
const { connected: isConnected } = this.props;
if (isConnected) {
dispatchSend(message);
}
});
this.setState({
initialised: true,
});
stopSDLiveview() {
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
dispatchSend({
message_type: 'stop-sd',
});
}
stopWebRTCLiveview() {
if (this.webrtcTimeout) {
window.clearTimeout(this.webrtcTimeout);
this.webrtcTimeout = null;
}
if (this.webrtcSocket) {
this.webrtcSocket.onopen = null;
this.webrtcSocket.onmessage = null;
this.webrtcSocket.onerror = null;
this.webrtcSocket.onclose = null;
this.webrtcSocket.close();
this.webrtcSocket = null;
}
if (this.webrtcPeerConnection) {
this.webrtcPeerConnection.ontrack = null;
this.webrtcPeerConnection.onicecandidate = null;
this.webrtcPeerConnection.onconnectionstatechange = null;
this.webrtcPeerConnection.close();
this.webrtcPeerConnection = null;
}
this.pendingRemoteCandidates = [];
this.webrtcOfferStarted = false;
this.webrtcSessionId = null;
this.webrtcClientId = null;
if (this.videoRef.current) {
this.videoRef.current.srcObject = null;
}
}
sendWebRTCMessage(messageType, message = {}) {
if (!this.webrtcSocket || this.webrtcSocket.readyState !== WebSocket.OPEN) {
return;
}
this.webrtcSocket.send(
JSON.stringify({
client_id: this.webrtcClientId,
message_type: messageType,
message,
})
);
}
async handleWebRTCSignalMessage(event) {
let data;
try {
data = JSON.parse(event.data);
} catch (error) {
return;
}
const { message_type: messageType, message = {} } = data;
const { session_id: sessionID, sdp, candidate } = message;
if (messageType === 'hello-back') {
await this.beginWebRTCLiveview();
return;
}
if (sessionID && sessionID !== this.webrtcSessionId) {
return;
}
switch (messageType) {
case 'webrtc-answer':
try {
await this.webrtcPeerConnection.setRemoteDescription({
type: 'answer',
sdp: window.atob(sdp),
});
await this.flushPendingRemoteCandidates();
} catch (error) {
this.fallbackToSDLiveview(
`Unable to apply WebRTC answer: ${error.message}`
);
}
break;
case 'webrtc-candidate': {
try {
const candidateInit = JSON.parse(candidate);
if (
this.webrtcPeerConnection.remoteDescription &&
this.webrtcPeerConnection.remoteDescription.type
) {
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
} else {
this.pendingRemoteCandidates.push(candidateInit);
}
} catch (error) {
this.fallbackToSDLiveview(
`Unable to apply WebRTC candidate: ${error.message}`
);
}
break;
}
case 'webrtc-error':
this.fallbackToSDLiveview(
message.message || 'The agent could not start the WebRTC liveview.'
);
break;
default:
break;
}
}
async beginWebRTCLiveview() {
if (!this.webrtcPeerConnection || this.webrtcOfferStarted) {
return;
}
try {
this.webrtcOfferStarted = true;
const offer = await this.webrtcPeerConnection.createOffer({
offerToReceiveAudio: true,
offerToReceiveVideo: true,
});
await this.webrtcPeerConnection.setLocalDescription(offer);
this.sendWebRTCMessage('stream-hd', {
session_id: this.webrtcSessionId,
sdp: window.btoa(this.webrtcPeerConnection.localDescription.sdp),
});
} catch (error) {
this.fallbackToSDLiveview(
`Unable to initialise WebRTC liveview: ${error.message}`,
);
}
}
async flushPendingRemoteCandidates() {
if (
!this.webrtcPeerConnection ||
!this.webrtcPeerConnection.remoteDescription
) {
return;
}
while (this.pendingRemoteCandidates.length > 0) {
const candidateInit = this.pendingRemoteCandidates.shift();
try {
// eslint-disable-next-line no-await-in-loop
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
} catch (error) {
this.fallbackToSDLiveview(
`Unable to add remote ICE candidate: ${error.message}`,
);
return;
}
}
}
startWebRTCLiveview() {
if (this.webrtcPeerConnection || this.webrtcSocket) {
return;
}
this.stopSDLiveview();
this.webrtcClientId = createUUID();
this.webrtcSessionId = createUUID();
this.pendingRemoteCandidates = [];
this.webrtcPeerConnection = new window.RTCPeerConnection(
this.buildPeerConnectionConfig()
);
this.webrtcPeerConnection.ontrack = (event) => {
const [remoteStream] = event.streams;
if (this.videoRef.current && remoteStream) {
this.videoRef.current.srcObject = remoteStream;
const playPromise = this.videoRef.current.play();
if (playPromise && playPromise.catch) {
playPromise.catch(() => {});
}
}
this.setState({
liveviewLoaded: true,
});
};
this.webrtcPeerConnection.onicecandidate = (event) => {
if (!event.candidate) {
return;
}
this.sendWebRTCMessage('webrtc-candidate', {
session_id: this.webrtcSessionId,
candidate: JSON.stringify(event.candidate.toJSON()),
});
};
this.webrtcPeerConnection.onconnectionstatechange = () => {
const { connectionState } = this.webrtcPeerConnection;
if (connectionState === 'connected') {
this.setState({
liveviewLoaded: true,
});
}
if (
connectionState === 'failed' ||
connectionState === 'disconnected' ||
connectionState === 'closed'
) {
this.fallbackToSDLiveview(
`WebRTC connection ${connectionState}, falling back to SD liveview.`,
);
}
};
this.webrtcSocket = new WebSocket(config.WS_URL);
this.webrtcSocket.onopen = () => {
this.sendWebRTCMessage('hello', {});
};
this.webrtcSocket.onmessage = this.handleWebRTCSignalMessage;
this.webrtcSocket.onerror = () => {
this.fallbackToSDLiveview('Unable to open the WebRTC signaling channel.');
};
this.webrtcSocket.onclose = () => {
const { liveviewLoaded } = this.state;
if (!liveviewLoaded) {
this.fallbackToSDLiveview('WebRTC signaling channel closed early.');
}
};
this.webrtcTimeout = window.setTimeout(() => {
const { liveviewLoaded } = this.state;
if (!liveviewLoaded) {
this.fallbackToSDLiveview(
'WebRTC connection timed out, falling back to SD liveview.'
);
}
}, 10000);
this.setState({
initialised: true,
liveviewLoaded: false,
liveviewMode: 'webrtc',
});
}
fallbackToSDLiveview(errorMessage) {
const { liveviewMode } = this.state;
if (liveviewMode === 'sd' && this.requestStreamSubscription) {
return;
}
this.stopWebRTCLiveview();
if (errorMessage) {
// eslint-disable-next-line no-console
console.warn(errorMessage);
}
this.setState(
{
initialised: true,
liveviewLoaded: false,
liveviewMode: 'sd',
},
() => {
this.initialiseSDLiveview();
}
);
}
openModal(file) {
@@ -121,7 +491,8 @@ class Dashboard extends React.Component {
render() {
const { dashboard, t, images } = this.props;
const { liveviewLoaded, open, currentRecording } = this.state;
const { liveviewLoaded, liveviewMode, open, currentRecording } = this.state;
const listenerCount = dashboard.webrtcReaders ? dashboard.webrtcReaders : 0;
// We check if the camera was getting a valid frame
// during the last 5 seconds, otherwise we assume the camera is offline.
@@ -175,7 +546,6 @@ class Dashboard extends React.Component {
divider="0"
footer={t('dashboard.total_recordings')}
/>
<Link to="/settings">
<Card
title="IP Camera"
@@ -314,7 +684,9 @@ class Dashboard extends React.Component {
)}
</div>
<div>
<h2>{t('dashboard.live_view')}</h2>
<h2>
{t('dashboard.live_view')} ({listenerCount})
</h2>
{(!liveviewLoaded || !isCameraOnline) && (
<SetupBox
btnicon="preferences"
@@ -331,12 +703,16 @@ class Dashboard extends React.Component {
liveviewLoaded && isCameraOnline ? 'visible' : 'hidden',
}}
>
<ImageCard
imageSrc={`data:image/png;base64, ${
images.length ? images[0] : ''
}`}
onerror=""
/>
{liveviewMode === 'webrtc' ? (
<video ref={this.videoRef} autoPlay muted playsInline />
) : (
<ImageCard
imageSrc={`data:image/png;base64, ${
images.length ? images[0] : ''
}`}
onerror=""
/>
)}
</div>
</div>
</div>
@@ -348,20 +724,25 @@ class Dashboard extends React.Component {
const mapStateToProps = (state /* , ownProps */) => ({
dashboard: state.agent.dashboard,
config: state.agent.config,
connected: state.wss.connected,
images: state.wss.images,
});
const mapDispatchToProps = (dispatch) => ({
dispatchSend: (message) => dispatch(send(message)),
dispatchGetConfig: (onSuccess, onError) =>
dispatch(getConfig(onSuccess, onError)),
});
Dashboard.propTypes = {
dashboard: PropTypes.object.isRequired,
config: PropTypes.object.isRequired,
connected: PropTypes.bool.isRequired,
images: PropTypes.array.isRequired,
t: PropTypes.func.isRequired,
dispatchSend: PropTypes.func.isRequired,
dispatchGetConfig: PropTypes.func.isRequired,
};
export default withTranslation()(

View File

@@ -3,6 +3,7 @@ import PropTypes from 'prop-types';
import { withTranslation } from 'react-i18next';
import {
Breadcrumb,
ControlBar,
VideoCard,
Button,
Modal,
@@ -16,14 +17,53 @@ import { getEvents } from '../../actions/agent';
import config from '../../config';
import './Media.scss';
function formatDateTimeLocal(date) {
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
const hours = String(date.getHours()).padStart(2, '0');
const minutes = String(date.getMinutes()).padStart(2, '0');
return `${year}-${month}-${day}T${hours}:${minutes}`;
}
function getDefaultTimeWindow() {
const endDate = new Date();
const startDate = new Date(endDate.getTime() - 60 * 60 * 1000);
return {
startDateTime: formatDateTimeLocal(startDate),
endDateTime: formatDateTimeLocal(endDate),
timestamp_offset_start: Math.floor(startDate.getTime() / 1000),
timestamp_offset_end: Math.floor(endDate.getTime() / 1000) + 59,
};
}
function normalizeInputValue(valueOrEvent) {
if (valueOrEvent && valueOrEvent.target) {
return valueOrEvent.target.value;
}
return valueOrEvent;
}
// eslint-disable-next-line react/prefer-stateless-function
class Media extends React.Component {
constructor() {
super();
this.state = {
timestamp_offset_start: 0,
timestamp_offset_end: 0,
const defaultTimeWindow = getDefaultTimeWindow();
const initialFilter = {
timestamp_offset_start: defaultTimeWindow.timestamp_offset_start,
timestamp_offset_end: defaultTimeWindow.timestamp_offset_end,
number_of_elements: 12,
};
this.state = {
appliedFilter: initialFilter,
startDateTime: defaultTimeWindow.startDateTime,
endDateTime: defaultTimeWindow.endDateTime,
isScrolling: false,
open: false,
currentRecording: '',
@@ -32,7 +72,8 @@ class Media extends React.Component {
componentDidMount() {
const { dispatchGetEvents } = this.props;
dispatchGetEvents(this.state);
const { appliedFilter } = this.state;
dispatchGetEvents(appliedFilter);
document.addEventListener('scroll', this.trackScrolling);
}
@@ -49,29 +90,107 @@ class Media extends React.Component {
trackScrolling = () => {
const { events, dispatchGetEvents } = this.props;
const { isScrolling } = this.state;
const { isScrolling, appliedFilter } = this.state;
const wrappedElement = document.getElementById('loader');
if (!isScrolling && this.isBottom(wrappedElement)) {
this.setState({
isScrolling: true,
});
// Get last element
const lastElement = events[events.length - 1];
if (lastElement) {
this.setState({
if (!wrappedElement || isScrolling || !this.isBottom(wrappedElement)) {
return;
}
this.setState({
isScrolling: true,
});
// Get last element
const lastElement = events[events.length - 1];
if (lastElement) {
dispatchGetEvents(
{
...appliedFilter,
timestamp_offset_end: parseInt(lastElement.timestamp, 10),
});
dispatchGetEvents(this.state, () => {
},
() => {
setTimeout(() => {
this.setState({
isScrolling: false,
});
}, 1000);
});
}
},
() => {
this.setState({
isScrolling: false,
});
},
true
);
} else {
this.setState({
isScrolling: false,
});
}
};
buildEventFilter(startDateTime, endDateTime) {
const { appliedFilter } = this.state;
return {
timestamp_offset_start: this.getTimestampFromInput(
startDateTime,
'start'
),
timestamp_offset_end: this.getTimestampFromInput(endDateTime, 'end'),
number_of_elements: appliedFilter.number_of_elements,
};
}
handleDateFilterChange(field, value) {
const { dispatchGetEvents } = this.props;
const { startDateTime, endDateTime } = this.state;
const normalizedValue = normalizeInputValue(value);
const nextStartDateTime =
field === 'startDateTime' ? normalizedValue : startDateTime;
const nextEndDateTime =
field === 'endDateTime' ? normalizedValue : endDateTime;
const nextFilter = this.buildEventFilter(
nextStartDateTime,
nextEndDateTime
);
const shouldApplyFilter =
(nextStartDateTime === '' || nextStartDateTime.length === 16) &&
(nextEndDateTime === '' || nextEndDateTime.length === 16);
this.setState(
{
[field]: normalizedValue,
appliedFilter: shouldApplyFilter
? nextFilter
: this.state.appliedFilter,
isScrolling: false,
},
() => {
if (shouldApplyFilter) {
dispatchGetEvents(nextFilter);
}
}
);
}
getTimestampFromInput(value, boundary) {
if (!value) {
return 0;
}
const date = new Date(value);
if (Number.isNaN(date.getTime())) {
return 0;
}
const seconds = Math.floor(date.getTime() / 1000);
if (boundary === 'end') {
return seconds + 59;
}
return seconds;
}
isBottom(el) {
return el.getBoundingClientRect().bottom + 50 <= window.innerHeight;
}
@@ -85,7 +204,9 @@ class Media extends React.Component {
render() {
const { events, eventsLoaded, t } = this.props;
const { isScrolling, open, currentRecording } = this.state;
const { isScrolling, open, currentRecording, startDateTime, endDateTime } =
this.state;
return (
<div id="media">
<Breadcrumb
@@ -102,6 +223,37 @@ class Media extends React.Component {
</Link>
</Breadcrumb>
<div className="media-control-bar">
<ControlBar>
<div className="media-filters">
<div className="media-filters__field">
<label htmlFor="recordings-start-time">Start time</label>
<input
className="media-filters__input"
id="recordings-start-time"
type="datetime-local"
value={startDateTime}
onChange={(value) =>
this.handleDateFilterChange('startDateTime', value)
}
/>
</div>
<div className="media-filters__field">
<label htmlFor="recordings-end-time">End time</label>
<input
className="media-filters__input"
id="recordings-end-time"
type="datetime-local"
value={endDateTime}
onChange={(value) =>
this.handleDateFilterChange('endDateTime', value)
}
/>
</div>
</div>
</ControlBar>
</div>
<div className="stats grid-container --four-columns">
{events.map((event) => (
<div
@@ -123,6 +275,11 @@ class Media extends React.Component {
</div>
))}
</div>
{events.length === 0 && eventsLoaded === 0 && (
<div className="media-empty-state">
No recordings found in the selected time range.
</div>
)}
{open && (
<Modal>
<ModalHeader
@@ -182,13 +339,13 @@ const mapStateToProps = (state /* , ownProps */) => ({
});
const mapDispatchToProps = (dispatch) => ({
dispatchGetEvents: (eventFilter, success, error) =>
dispatch(getEvents(eventFilter, success, error)),
dispatchGetEvents: (eventFilter, success, error, append) =>
dispatch(getEvents(eventFilter, success, error, append)),
});
Media.propTypes = {
t: PropTypes.func.isRequired,
events: PropTypes.objectOf(PropTypes.object).isRequired,
events: PropTypes.arrayOf(PropTypes.object).isRequired,
eventsLoaded: PropTypes.number.isRequired,
dispatchGetEvents: PropTypes.func.isRequired,
};

View File

@@ -1,4 +1,103 @@
#media {
.media-control-bar {
.control-bar {
display: block;
padding: 0 var(--main-content-gutter);
}
.control-bar .filtering {
display: block;
}
.control-bar .filtering > * {
border-right: 0;
flex: 1 1 100% !important;
max-width: none;
width: 100%;
}
}
.media-filters {
align-items: stretch;
display: grid;
gap: 0;
grid-template-columns: repeat(2, minmax(0, 1fr));
min-width: 0;
width: 100%;
}
.media-filters__field {
border-right: 1px solid var(--bg-muted);
min-width: 0;
padding: 16px 24px;
label {
display: block;
font-size: 14px;
font-weight: 600;
margin-bottom: 8px;
white-space: nowrap;
}
}
.media-filters__field:first-child {
padding-left: 0;
}
.media-filters__input {
appearance: none;
background: var(--white);
border: 1px solid var(--grey-light);
border-radius: 8px;
box-sizing: border-box;
color: var(--black);
font-size: 16px;
min-height: 48px;
padding: 0 14px 0 0;
width: 100%;
}
.media-filters__input::-webkit-datetime-edit,
.media-filters__input::-webkit-datetime-edit-fields-wrapper {
padding: 0;
}
.media-filters__input:focus {
border-color: var(--oss);
outline: 0;
}
.media-filters__field:first-child .media-filters__input {
padding-left: 0;
}
.media-filters__field:last-child {
border-right: 0;
padding-right: 0;
}
@media (max-width: 700px) {
.media-filters {
grid-template-columns: 1fr;
}
.media-filters__field {
border-right: 0;
border-bottom: 1px solid var(--bg-muted);
padding-left: 0;
padding-right: 0;
}
.media-filters__field:last-child {
border-bottom: 0;
}
}
.media-empty-state {
margin: 24px 0;
opacity: 0.8;
text-align: center;
}
#loader {
display: flex;

View File

@@ -123,16 +123,12 @@ const agent = (
};
case 'GET_EVENTS':
const { timestamp_offset_end } = action.filter;
const { events } = action;
return {
...state,
eventsLoaded: events.length,
events:
timestamp_offset_end === 0
? [...events]
: [...state.events, ...events],
eventfilter: action.eventfilter,
events: action.append ? [...state.events, ...events] : [...events],
eventfilter: action.filter,
};
default: