mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-10 05:52:06 +00:00
Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8657765e5d | ||
|
|
76a136abc9 | ||
|
|
5475b79459 | ||
|
|
2ad768780f | ||
|
|
f64b5fb65b | ||
|
|
bb773316a2 | ||
|
|
fc6fa9d425 | ||
|
|
aa183ee0fb | ||
|
|
730b1b2a40 | ||
|
|
4efc80fecb | ||
|
|
4fbee60e9f | ||
|
|
d6c25df280 | ||
|
|
72a2d28e1e | ||
|
|
eb0972084f | ||
|
|
41a1d221fc | ||
|
|
eaacc93d2f |
@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
|
||||
COPY --chown=0:0 --from=build-machinery /dist /
|
||||
COPY --chown=0:0 --from=build-ui /dist /
|
||||
|
||||
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
|
||||
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
|
||||
|
||||
##################
|
||||
# Try running agent
|
||||
|
||||
@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
|
||||
COPY --chown=0:0 --from=build-machinery /dist /
|
||||
COPY --chown=0:0 --from=build-ui /dist /
|
||||
|
||||
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
|
||||
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
|
||||
|
||||
##################
|
||||
# Try running agent
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/kerberos-io/agent/machinery/src/packets"
|
||||
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
|
||||
"github.com/kerberos-io/agent/machinery/src/utils"
|
||||
"github.com/kerberos-io/agent/machinery/src/webrtc"
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
@@ -303,7 +304,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
}
|
||||
|
||||
// Handle livestream HD (high resolution over WEBRTC)
|
||||
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 10)
|
||||
communication.HandleLiveHDHandshake = make(chan models.LiveHDHandshake, 100)
|
||||
if subStreamEnabled {
|
||||
livestreamHDCursor := subQueue.Latest()
|
||||
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient)
|
||||
@@ -552,6 +553,11 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
|
||||
// The total number of recordings stored in the directory.
|
||||
recordingDirectory := configDirectory + "/data/recordings"
|
||||
numberOfRecordings := utils.NumberOfMP4sInDirectory(recordingDirectory)
|
||||
activeWebRTCReaders := webrtc.GetActivePeerConnectionCount()
|
||||
pendingWebRTCHandshakes := 0
|
||||
if communication.HandleLiveHDHandshake != nil {
|
||||
pendingWebRTCHandshakes = len(communication.HandleLiveHDHandshake)
|
||||
}
|
||||
|
||||
// All days stored in this agent.
|
||||
days := []string{}
|
||||
@@ -574,6 +580,8 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
|
||||
"cameraOnline": cameraIsOnline,
|
||||
"cloudOnline": cloudIsOnline,
|
||||
"numberOfRecordings": numberOfRecordings,
|
||||
"webrtcReaders": activeWebRTCReaders,
|
||||
"webrtcPending": pendingWebRTCHandshakes,
|
||||
"days": days,
|
||||
"latestEvents": latestEvents,
|
||||
})
|
||||
|
||||
@@ -8,6 +8,17 @@ import (
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
type LiveHDSignalingCallbacks struct {
|
||||
SendAnswer func(sessionID string, sdp string) error
|
||||
SendCandidate func(sessionID string, candidate string) error
|
||||
SendError func(sessionID string, message string) error
|
||||
}
|
||||
|
||||
type LiveHDHandshake struct {
|
||||
Payload RequestHDStreamPayload
|
||||
Signaling *LiveHDSignalingCallbacks
|
||||
}
|
||||
|
||||
// The communication struct that is managing
|
||||
// all the communication between the different goroutines.
|
||||
type Communication struct {
|
||||
@@ -27,7 +38,7 @@ type Communication struct {
|
||||
HandleHeartBeat chan string
|
||||
HandleLiveSD chan int64
|
||||
HandleLiveHDKeepalive chan string
|
||||
HandleLiveHDHandshake chan RequestHDStreamPayload
|
||||
HandleLiveHDHandshake chan LiveHDHandshake
|
||||
HandleLiveHDPeers chan string
|
||||
HandleONVIF chan OnvifAction
|
||||
IsConfiguring *abool.AtomicBool
|
||||
|
||||
@@ -90,6 +90,7 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
|
||||
|
||||
// Some extra options to make sure the connection behaves
|
||||
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
|
||||
//opts.SetCleanSession(true)
|
||||
opts.SetCleanSession(false)
|
||||
opts.SetResumeSubs(true)
|
||||
opts.SetStore(mqtt.NewMemoryStore())
|
||||
@@ -537,9 +538,13 @@ func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models
|
||||
if communication.CameraConnected {
|
||||
// Set the Hub key, so we can send back the answer.
|
||||
requestHDStreamPayload.HubKey = hubKey
|
||||
select {
|
||||
case communication.HandleLiveHDHandshake <- requestHDStreamPayload:
|
||||
default:
|
||||
if communication.HandleLiveHDHandshake == nil {
|
||||
log.Log.Error("routers.mqtt.main.HandleRequestHDStream(): handshake channel is nil, dropping request")
|
||||
return
|
||||
}
|
||||
|
||||
communication.HandleLiveHDHandshake <- models.LiveHDHandshake{
|
||||
Payload: requestHDStreamPayload,
|
||||
}
|
||||
log.Log.Info("routers.mqtt.main.HandleRequestHDStream(): received request to setup webrtc.")
|
||||
} else {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"image"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
"github.com/kerberos-io/agent/machinery/src/packets"
|
||||
"github.com/kerberos-io/agent/machinery/src/utils"
|
||||
"github.com/kerberos-io/agent/machinery/src/webrtc"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
@@ -28,6 +30,23 @@ type Connection struct {
|
||||
Cancels map[string]context.CancelFunc
|
||||
}
|
||||
|
||||
func writeWebRTCError(connection *Connection, clientID string, sessionID string, errorMessage string) {
|
||||
if connection == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := connection.WriteJson(Message{
|
||||
ClientID: clientID,
|
||||
MessageType: "webrtc-error",
|
||||
Message: map[string]string{
|
||||
"session_id": sessionID,
|
||||
"message": errorMessage,
|
||||
},
|
||||
}); err != nil {
|
||||
log.Log.Error("routers.websocket.main.writeWebRTCError(): " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Concurrency handling - sending messages
|
||||
func (c *Connection) WriteJson(message Message) error {
|
||||
c.mu.Lock()
|
||||
@@ -115,6 +134,82 @@ func WebsocketHandler(c *gin.Context, configuration *models.Configuration, commu
|
||||
go ForwardSDStream(ctx, clientID, sockets[clientID], configuration, communication, captureDevice)
|
||||
}
|
||||
}
|
||||
|
||||
case "stream-hd":
|
||||
sessionID := message.Message["session_id"]
|
||||
sessionDescription := message.Message["sdp"]
|
||||
|
||||
if sessionID == "" || sessionDescription == "" {
|
||||
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or sdp")
|
||||
break
|
||||
}
|
||||
|
||||
if !communication.CameraConnected {
|
||||
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
|
||||
break
|
||||
}
|
||||
|
||||
if communication.HandleLiveHDHandshake == nil {
|
||||
writeWebRTCError(sockets[clientID], clientID, sessionID, "webrtc liveview is not available")
|
||||
break
|
||||
}
|
||||
|
||||
handshake := models.LiveHDHandshake{
|
||||
Payload: models.RequestHDStreamPayload{
|
||||
Timestamp: time.Now().Unix(),
|
||||
SessionID: sessionID,
|
||||
SessionDescription: sessionDescription,
|
||||
},
|
||||
Signaling: &models.LiveHDSignalingCallbacks{
|
||||
SendAnswer: func(callbackSessionID string, sdp string) error {
|
||||
return sockets[clientID].WriteJson(Message{
|
||||
ClientID: clientID,
|
||||
MessageType: "webrtc-answer",
|
||||
Message: map[string]string{
|
||||
"session_id": callbackSessionID,
|
||||
"sdp": sdp,
|
||||
},
|
||||
})
|
||||
},
|
||||
SendCandidate: func(callbackSessionID string, candidate string) error {
|
||||
return sockets[clientID].WriteJson(Message{
|
||||
ClientID: clientID,
|
||||
MessageType: "webrtc-candidate",
|
||||
Message: map[string]string{
|
||||
"session_id": callbackSessionID,
|
||||
"candidate": candidate,
|
||||
},
|
||||
})
|
||||
},
|
||||
SendError: func(callbackSessionID string, errorMessage string) error {
|
||||
writeWebRTCError(sockets[clientID], clientID, callbackSessionID, errorMessage)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
communication.HandleLiveHDHandshake <- handshake
|
||||
|
||||
case "webrtc-candidate":
|
||||
sessionID := message.Message["session_id"]
|
||||
candidate := message.Message["candidate"]
|
||||
|
||||
if sessionID == "" || candidate == "" {
|
||||
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or candidate")
|
||||
break
|
||||
}
|
||||
|
||||
if !communication.CameraConnected {
|
||||
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
|
||||
break
|
||||
}
|
||||
|
||||
key := configuration.Config.Key + "/" + sessionID
|
||||
go webrtc.RegisterCandidates(key, models.ReceiveHDCandidatesPayload{
|
||||
Timestamp: time.Now().Unix(),
|
||||
SessionID: sessionID,
|
||||
Candidate: candidate,
|
||||
})
|
||||
}
|
||||
|
||||
err = conn.ReadJSON(&message)
|
||||
|
||||
@@ -27,7 +27,8 @@ import (
|
||||
|
||||
// VERSION is the agent version. It defaults to "0.0.0" for local dev builds
|
||||
// and is overridden at build time via:
|
||||
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
|
||||
//
|
||||
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
|
||||
var VERSION = "0.0.0"
|
||||
|
||||
const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
@@ -198,6 +199,13 @@ func GetMediaFormatted(files []os.FileInfo, recordingDirectory string, configura
|
||||
timestampInt, err := strconv.ParseInt(timestamp, 10, 64)
|
||||
if err == nil {
|
||||
|
||||
if eventFilter.TimestampOffsetStart > 0 {
|
||||
// TimestampOffsetStart represents the newest lower bound to include.
|
||||
if timestampInt < eventFilter.TimestampOffsetStart {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// If we have an offset we will check if we should skip or not
|
||||
if eventFilter.TimestampOffsetEnd > 0 {
|
||||
// Medias are sorted from new to older. TimestampOffsetEnd holds the oldest
|
||||
|
||||
54
machinery/src/utils/main_test.go
Normal file
54
machinery/src/utils/main_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
)
|
||||
|
||||
type stubFileInfo struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (s stubFileInfo) Name() string { return s.name }
|
||||
func (s stubFileInfo) Size() int64 { return 0 }
|
||||
func (s stubFileInfo) Mode() os.FileMode { return 0 }
|
||||
func (s stubFileInfo) ModTime() time.Time { return time.Unix(0, 0) }
|
||||
func (s stubFileInfo) IsDir() bool { return false }
|
||||
func (s stubFileInfo) Sys() interface{} { return nil }
|
||||
|
||||
func TestGetMediaFormattedHonorsTimestampRange(t *testing.T) {
|
||||
configuration := &models.Configuration{}
|
||||
configuration.Config.Timezone = "UTC"
|
||||
configuration.Config.Name = "Front Door"
|
||||
configuration.Config.Key = "camera-1"
|
||||
|
||||
files := []os.FileInfo{
|
||||
stubFileInfo{name: "1700000200_6_7_8_9_10.mp4"},
|
||||
stubFileInfo{name: "1700000100_6_7_8_9_10.mp4"},
|
||||
stubFileInfo{name: "1700000000_6_7_8_9_10.mp4"},
|
||||
}
|
||||
|
||||
media := GetMediaFormatted(files, "/tmp/recordings", configuration, models.EventFilter{
|
||||
TimestampOffsetStart: 1700000050,
|
||||
TimestampOffsetEnd: 1700000200,
|
||||
NumberOfElements: 10,
|
||||
})
|
||||
|
||||
if len(media) != 1 {
|
||||
t.Fatalf("expected 1 media item in time range, got %d", len(media))
|
||||
}
|
||||
|
||||
if media[0].Timestamp != "1700000100" {
|
||||
t.Fatalf("expected timestamp 1700000100, got %s", media[0].Timestamp)
|
||||
}
|
||||
|
||||
if media[0].CameraName != "Front Door" {
|
||||
t.Fatalf("expected camera name to be preserved, got %s", media[0].CameraName)
|
||||
}
|
||||
if media[0].CameraKey != "camera-1" {
|
||||
t.Fatalf("expected camera key to be preserved, got %s", media[0].CameraKey)
|
||||
}
|
||||
}
|
||||
270
machinery/src/webrtc/aac_transcoder_ffmpeg.go
Normal file
270
machinery/src/webrtc/aac_transcoder_ffmpeg.go
Normal file
@@ -0,0 +1,270 @@
|
||||
// AAC to G.711 µ-law transcoder using FFmpeg (libavcodec + libswresample).
|
||||
// Build with: go build -tags ffmpeg ...
|
||||
//
|
||||
// Requires: libavcodec-dev, libavutil-dev, libswresample-dev (FFmpeg ≥ 5.x)
|
||||
// and an AAC decoder compiled into the FFmpeg build (usually the default).
|
||||
//
|
||||
//go:build ffmpeg
|
||||
|
||||
package webrtc
|
||||
|
||||
/*
|
||||
#cgo pkg-config: libavcodec libavutil libswresample
|
||||
#cgo CFLAGS: -Wno-deprecated-declarations
|
||||
|
||||
#include <libavcodec/avcodec.h>
|
||||
#include <libavutil/channel_layout.h>
|
||||
#include <libavutil/frame.h>
|
||||
#include <libavutil/mem.h>
|
||||
#include <libavutil/opt.h>
|
||||
#include <libswresample/swresample.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
// ── Transcoder handle ───────────────────────────────────────────────────
|
||||
|
||||
typedef struct {
|
||||
AVCodecContext *codec_ctx;
|
||||
AVCodecParserContext *parser;
|
||||
SwrContext *swr_ctx;
|
||||
AVFrame *frame;
|
||||
AVPacket *pkt;
|
||||
int swr_initialized;
|
||||
int in_sample_rate;
|
||||
int in_channels;
|
||||
} aac_transcoder_t;
|
||||
|
||||
// ── Create / Destroy ────────────────────────────────────────────────────
|
||||
|
||||
static aac_transcoder_t* aac_transcoder_create(void) {
|
||||
const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC);
|
||||
if (!codec) return NULL;
|
||||
|
||||
aac_transcoder_t *t = (aac_transcoder_t*)calloc(1, sizeof(aac_transcoder_t));
|
||||
if (!t) return NULL;
|
||||
|
||||
t->codec_ctx = avcodec_alloc_context3(codec);
|
||||
if (!t->codec_ctx) { free(t); return NULL; }
|
||||
|
||||
if (avcodec_open2(t->codec_ctx, codec, NULL) < 0) {
|
||||
avcodec_free_context(&t->codec_ctx);
|
||||
free(t);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
t->parser = av_parser_init(AV_CODEC_ID_AAC);
|
||||
if (!t->parser) {
|
||||
avcodec_free_context(&t->codec_ctx);
|
||||
free(t);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
t->frame = av_frame_alloc();
|
||||
t->pkt = av_packet_alloc();
|
||||
if (!t->frame || !t->pkt) {
|
||||
if (t->frame) av_frame_free(&t->frame);
|
||||
if (t->pkt) av_packet_free(&t->pkt);
|
||||
av_parser_close(t->parser);
|
||||
avcodec_free_context(&t->codec_ctx);
|
||||
free(t);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
static void aac_transcoder_destroy(aac_transcoder_t *t) {
|
||||
if (!t) return;
|
||||
if (t->swr_ctx) swr_free(&t->swr_ctx);
|
||||
if (t->frame) av_frame_free(&t->frame);
|
||||
if (t->pkt) av_packet_free(&t->pkt);
|
||||
if (t->parser) av_parser_close(t->parser);
|
||||
if (t->codec_ctx) avcodec_free_context(&t->codec_ctx);
|
||||
free(t);
|
||||
}
|
||||
|
||||
// ── Lazy resampler init (called after the first decoded frame) ──────────
|
||||
|
||||
static int aac_init_swr(aac_transcoder_t *t) {
|
||||
int64_t in_ch_layout = (int64_t)t->codec_ctx->channel_layout;
|
||||
if (in_ch_layout == 0)
|
||||
in_ch_layout = av_get_default_channel_layout(t->codec_ctx->channels);
|
||||
|
||||
t->swr_ctx = swr_alloc_set_opts(
|
||||
NULL,
|
||||
AV_CH_LAYOUT_MONO, // out: mono
|
||||
AV_SAMPLE_FMT_S16, // out: signed 16-bit
|
||||
8000, // out: 8 kHz
|
||||
in_ch_layout, // in: from decoder
|
||||
t->codec_ctx->sample_fmt, // in: from decoder
|
||||
t->codec_ctx->sample_rate, // in: from decoder
|
||||
0, NULL);
|
||||
|
||||
if (!t->swr_ctx) return -1;
|
||||
if (swr_init(t->swr_ctx) < 0) {
|
||||
swr_free(&t->swr_ctx);
|
||||
return -1;
|
||||
}
|
||||
|
||||
t->in_sample_rate = t->codec_ctx->sample_rate;
|
||||
t->in_channels = t->codec_ctx->channels;
|
||||
t->swr_initialized = 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ── Transcode ADTS → 8 kHz mono S16 PCM ────────────────────────────────
|
||||
// Caller must free *out_pcm with av_free() when non-NULL.
|
||||
|
||||
static int aac_transcode_to_pcm(aac_transcoder_t *t,
|
||||
const uint8_t *data, int data_size,
|
||||
uint8_t **out_pcm, int *out_size) {
|
||||
*out_pcm = NULL;
|
||||
*out_size = 0;
|
||||
if (!data || data_size <= 0) return 0;
|
||||
|
||||
int buf_cap = 8192;
|
||||
uint8_t *buf = (uint8_t*)av_malloc(buf_cap);
|
||||
if (!buf) return -1;
|
||||
int buf_len = 0;
|
||||
|
||||
while (data_size > 0) {
|
||||
uint8_t *pout = NULL;
|
||||
int pout_size = 0;
|
||||
|
||||
int used = av_parser_parse2(t->parser, t->codec_ctx,
|
||||
&pout, &pout_size,
|
||||
data, data_size,
|
||||
AV_NOPTS_VALUE, AV_NOPTS_VALUE, 0);
|
||||
if (used < 0) break;
|
||||
data += used;
|
||||
data_size -= used;
|
||||
if (pout_size == 0) continue;
|
||||
|
||||
// Feed parsed frame to decoder
|
||||
t->pkt->data = pout;
|
||||
t->pkt->size = pout_size;
|
||||
if (avcodec_send_packet(t->codec_ctx, t->pkt) < 0) continue;
|
||||
|
||||
// Pull all decoded frames
|
||||
while (avcodec_receive_frame(t->codec_ctx, t->frame) == 0) {
|
||||
if (!t->swr_initialized) {
|
||||
if (aac_init_swr(t) < 0) {
|
||||
av_frame_unref(t->frame);
|
||||
av_free(buf);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int out_samples = swr_get_out_samples(t->swr_ctx,
|
||||
t->frame->nb_samples);
|
||||
if (out_samples <= 0) out_samples = t->frame->nb_samples;
|
||||
|
||||
int needed = buf_len + out_samples * 2; // S16 = 2 bytes/sample
|
||||
if (needed > buf_cap) {
|
||||
buf_cap = needed * 2;
|
||||
uint8_t *tmp = (uint8_t*)av_realloc(buf, buf_cap);
|
||||
if (!tmp) { av_frame_unref(t->frame); av_free(buf); return -1; }
|
||||
buf = tmp;
|
||||
}
|
||||
|
||||
uint8_t *dst = buf + buf_len;
|
||||
int converted = swr_convert(t->swr_ctx,
|
||||
&dst, out_samples,
|
||||
(const uint8_t**)t->frame->extended_data,
|
||||
t->frame->nb_samples);
|
||||
if (converted > 0)
|
||||
buf_len += converted * 2;
|
||||
|
||||
av_frame_unref(t->frame);
|
||||
}
|
||||
}
|
||||
|
||||
if (buf_len == 0) {
|
||||
av_free(buf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
*out_pcm = buf;
|
||||
*out_size = buf_len;
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/zaf/g711"
|
||||
)
|
||||
|
||||
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
|
||||
// is compiled in (requires the "ffmpeg" build tag).
|
||||
func AACTranscodingAvailable() bool { return true }
|
||||
|
||||
// AACTranscoder decodes ADTS-wrapped AAC audio to 8 kHz mono PCM
|
||||
// and encodes it as G.711 µ-law for WebRTC transport.
|
||||
type AACTranscoder struct {
|
||||
handle *C.aac_transcoder_t
|
||||
}
|
||||
|
||||
// NewAACTranscoder creates a transcoder backed by FFmpeg's AAC decoder.
|
||||
func NewAACTranscoder() (*AACTranscoder, error) {
|
||||
h := C.aac_transcoder_create()
|
||||
if h == nil {
|
||||
return nil, errors.New("failed to create AAC transcoder (FFmpeg AAC decoder not available?)")
|
||||
}
|
||||
log.Log.Info("webrtc.aac_transcoder: AAC → G.711 µ-law transcoder initialised (FFmpeg)")
|
||||
return &AACTranscoder{handle: h}, nil
|
||||
}
|
||||
|
||||
// Transcode converts an ADTS buffer (one or more AAC frames) into
|
||||
// G.711 µ-law encoded audio suitable for a PCMU WebRTC track.
|
||||
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
|
||||
if t == nil || t.handle == nil || len(adtsData) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var outPCM *C.uint8_t
|
||||
var outSize C.int
|
||||
|
||||
ret := C.aac_transcode_to_pcm(
|
||||
t.handle,
|
||||
(*C.uint8_t)(unsafe.Pointer(&adtsData[0])),
|
||||
C.int(len(adtsData)),
|
||||
&outPCM, &outSize,
|
||||
)
|
||||
if ret < 0 {
|
||||
return nil, errors.New("AAC decode/resample failed")
|
||||
}
|
||||
if outSize == 0 || outPCM == nil {
|
||||
return nil, nil // decoder buffering, no output yet
|
||||
}
|
||||
defer C.av_free(unsafe.Pointer(outPCM))
|
||||
|
||||
// Copy S16LE PCM to Go slice, then encode to µ-law.
|
||||
pcm := C.GoBytes(unsafe.Pointer(outPCM), outSize)
|
||||
ulaw := g711.EncodeUlaw(pcm)
|
||||
|
||||
// Log resampler details once.
|
||||
if t.handle.swr_initialized == 1 && t.handle.in_sample_rate != 0 {
|
||||
log.Log.Info(fmt.Sprintf(
|
||||
"webrtc.aac_transcoder: first output – resampling %d Hz / %d ch → 8000 Hz mono → µ-law",
|
||||
int(t.handle.in_sample_rate), int(t.handle.in_channels)))
|
||||
// Prevent repeated logging by zeroing the field we check.
|
||||
t.handle.in_sample_rate = 0
|
||||
}
|
||||
|
||||
return ulaw, nil
|
||||
}
|
||||
|
||||
// Close releases all FFmpeg resources held by the transcoder.
|
||||
func (t *AACTranscoder) Close() {
|
||||
if t != nil && t.handle != nil {
|
||||
C.aac_transcoder_destroy(t.handle)
|
||||
t.handle = nil
|
||||
log.Log.Info("webrtc.aac_transcoder: transcoder closed")
|
||||
}
|
||||
}
|
||||
205
machinery/src/webrtc/aac_transcoder_stub.go
Normal file
205
machinery/src/webrtc/aac_transcoder_stub.go
Normal file
@@ -0,0 +1,205 @@
|
||||
// AAC transcoding fallback that uses the ffmpeg binary at runtime.
|
||||
// Build with -tags ffmpeg to use the in-process CGO implementation instead.
|
||||
//
|
||||
//go:build !ffmpeg
|
||||
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
)
|
||||
|
||||
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
|
||||
// is available in the current runtime.
|
||||
func AACTranscodingAvailable() bool {
|
||||
_, err := exec.LookPath("ffmpeg")
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// AACTranscoder uses an ffmpeg subprocess to convert ADTS AAC to raw PCMU.
|
||||
type AACTranscoder struct {
|
||||
cmd *exec.Cmd
|
||||
stdin io.WriteCloser
|
||||
stdout io.ReadCloser
|
||||
stderrBuf bytes.Buffer
|
||||
|
||||
mu sync.Mutex
|
||||
outMu sync.Mutex
|
||||
outBuf bytes.Buffer
|
||||
closed bool
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// NewAACTranscoder creates a runtime ffmpeg-based transcoder.
|
||||
func NewAACTranscoder() (*AACTranscoder, error) {
|
||||
ffmpegPath, err := exec.LookPath("ffmpeg")
|
||||
if err != nil {
|
||||
return nil, errors.New("AAC transcoding not available: ffmpeg binary not found in PATH")
|
||||
}
|
||||
log.Log.Info("webrtc.aac_transcoder: using ffmpeg binary at " + ffmpegPath)
|
||||
|
||||
cmd := exec.Command(
|
||||
ffmpegPath,
|
||||
"-hide_banner",
|
||||
"-loglevel", "error",
|
||||
"-fflags", "+nobuffer",
|
||||
"-flags", "low_delay",
|
||||
"-f", "aac",
|
||||
"-i", "pipe:0",
|
||||
"-vn",
|
||||
"-ac", "1",
|
||||
"-ar", "8000",
|
||||
"-acodec", "pcm_mulaw",
|
||||
"-f", "mulaw",
|
||||
"pipe:1",
|
||||
)
|
||||
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cmd.Stderr = &bytes.Buffer{}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := &AACTranscoder{
|
||||
cmd: cmd,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
}
|
||||
if stderrBuf, ok := cmd.Stderr.(*bytes.Buffer); ok {
|
||||
t.stderrBuf = *stderrBuf
|
||||
}
|
||||
|
||||
go func() {
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, readErr := stdout.Read(buf)
|
||||
if n > 0 {
|
||||
t.outMu.Lock()
|
||||
_, _ = t.outBuf.Write(buf[:n])
|
||||
buffered := t.outBuf.Len()
|
||||
t.outMu.Unlock()
|
||||
if buffered <= 8192 || buffered%16000 == 0 {
|
||||
log.Log.Info("webrtc.aac_transcoder: ffmpeg produced PCMU bytes, buffered=" + strconv.Itoa(buffered))
|
||||
}
|
||||
}
|
||||
if readErr != nil {
|
||||
if readErr != io.EOF {
|
||||
log.Log.Warning("webrtc.aac_transcoder: stdout reader stopped: " + readErr.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Log.Info("webrtc.aac_transcoder: AAC → PCMU transcoder initialised (ffmpeg process)")
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// Transcode writes ADTS AAC to ffmpeg and returns any PCMU bytes produced.
|
||||
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
|
||||
if t == nil || len(adtsData) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if t.closed {
|
||||
return nil, errors.New("AAC transcoder is closed")
|
||||
}
|
||||
|
||||
if _, err := t.stdin.Write(adtsData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(adtsData) <= 512 || len(adtsData)%1024 == 0 {
|
||||
log.Log.Info("webrtc.aac_transcoder: wrote AAC bytes to ffmpeg, input=" + strconv.Itoa(len(adtsData)))
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(75 * time.Millisecond)
|
||||
for {
|
||||
data := t.readAvailable()
|
||||
if len(data) > 0 {
|
||||
log.Log.Info("webrtc.aac_transcoder: returning PCMU bytes=" + strconv.Itoa(len(data)))
|
||||
return data, nil
|
||||
}
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
if stderr := t.stderrString(); stderr != "" {
|
||||
log.Log.Warning("webrtc.aac_transcoder: no output before deadline, ffmpeg stderr: " + stderr)
|
||||
} else {
|
||||
log.Log.Info("webrtc.aac_transcoder: no PCMU output before deadline")
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *AACTranscoder) readAvailable() []byte {
|
||||
t.outMu.Lock()
|
||||
defer t.outMu.Unlock()
|
||||
|
||||
if t.outBuf.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
out := make([]byte, t.outBuf.Len())
|
||||
copy(out, t.outBuf.Bytes())
|
||||
t.outBuf.Reset()
|
||||
return out
|
||||
}
|
||||
|
||||
func (t *AACTranscoder) stderrString() string {
|
||||
if t == nil {
|
||||
return ""
|
||||
}
|
||||
if stderrBuf, ok := t.cmd.Stderr.(*bytes.Buffer); ok {
|
||||
return strings.TrimSpace(stderrBuf.String())
|
||||
}
|
||||
return strings.TrimSpace(t.stderrBuf.String())
|
||||
}
|
||||
|
||||
// Close stops the ffmpeg subprocess.
|
||||
func (t *AACTranscoder) Close() {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
t.closeOnce.Do(func() {
|
||||
t.mu.Lock()
|
||||
t.closed = true
|
||||
if t.stdin != nil {
|
||||
_ = t.stdin.Close()
|
||||
}
|
||||
t.mu.Unlock()
|
||||
|
||||
if t.stdout != nil {
|
||||
_ = t.stdout.Close()
|
||||
}
|
||||
|
||||
if t.cmd != nil {
|
||||
_ = t.cmd.Process.Kill()
|
||||
_, _ = t.cmd.Process.Wait()
|
||||
if stderr := t.stderrString(); stderr != "" {
|
||||
log.Log.Info("webrtc.aac_transcoder: ffmpeg stderr on close: " + stderr)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -4,13 +4,14 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
//"github.com/izern/go-fdkaac/fdkaac"
|
||||
"github.com/kerberos-io/agent/machinery/src/capture"
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
@@ -139,6 +140,11 @@ func (cm *ConnectionManager) GetPeerConnectionCount() int64 {
|
||||
return atomic.LoadInt64(&cm.peerConnectionCount)
|
||||
}
|
||||
|
||||
// GetActivePeerConnectionCount returns the current number of connected WebRTC readers.
|
||||
func GetActivePeerConnectionCount() int64 {
|
||||
return globalConnectionManager.GetPeerConnectionCount()
|
||||
}
|
||||
|
||||
// IncrementPeerCount atomically increments the peer connection count
|
||||
func (cm *ConnectionManager) IncrementPeerCount() int64 {
|
||||
return atomic.AddInt64(&cm.peerConnectionCount, 1)
|
||||
@@ -251,7 +257,79 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
|
||||
return nil
|
||||
}
|
||||
|
||||
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) {
|
||||
func publishSignalingMessageAsync(mqttClient mqtt.Client, topic string, payload []byte, description string) {
|
||||
if mqttClient == nil {
|
||||
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): mqtt client is nil for " + description)
|
||||
return
|
||||
}
|
||||
|
||||
token := mqttClient.Publish(topic, 2, false, payload)
|
||||
go func() {
|
||||
if !token.WaitTimeout(5 * time.Second) {
|
||||
log.Log.Warning("webrtc.main.publishSignalingMessageAsync(): timed out publishing " + description)
|
||||
return
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): failed publishing " + description + ": " + err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func sendCandidateSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, candidateJSON []byte) {
|
||||
if handshake.Signaling != nil && handshake.Signaling.SendCandidate != nil {
|
||||
if err := handshake.Signaling.SendCandidate(handshake.Payload.SessionID, string(candidateJSON)); err != nil {
|
||||
log.Log.Error("webrtc.main.sendCandidateSignal(): " + err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Action: "receive-hd-candidates",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: map[string]interface{}{
|
||||
"candidate": string(candidateJSON),
|
||||
"session_id": handshake.Payload.SessionID,
|
||||
},
|
||||
},
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "ICE candidate for session "+handshake.Payload.SessionID)
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.sendCandidateSignal(): while packaging mqtt message: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func sendAnswerSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, answer pionWebRTC.SessionDescription) {
|
||||
encodedAnswer := base64.StdEncoding.EncodeToString([]byte(answer.SDP))
|
||||
|
||||
if handshake.Signaling != nil && handshake.Signaling.SendAnswer != nil {
|
||||
if err := handshake.Signaling.SendAnswer(handshake.Payload.SessionID, encodedAnswer); err != nil {
|
||||
log.Log.Error("webrtc.main.sendAnswerSignal(): " + err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Action: "receive-hd-answer",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: map[string]interface{}{
|
||||
"sdp": []byte(encodedAnswer),
|
||||
"session_id": handshake.Payload.SessionID,
|
||||
},
|
||||
},
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "SDP answer for session "+handshake.Payload.SessionID)
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.sendAnswerSignal(): while packaging mqtt message: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.LiveHDHandshake) {
|
||||
|
||||
config := configuration.Config
|
||||
deviceKey := config.Key
|
||||
@@ -259,14 +337,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
turnServers := []string{config.TURNURI}
|
||||
turnServersUsername := config.TURNUsername
|
||||
turnServersCredential := config.TURNPassword
|
||||
handshakePayload := handshake.Payload
|
||||
|
||||
// We create a channel which will hold the candidates for this session.
|
||||
sessionKey := config.Key + "/" + handshake.SessionID
|
||||
sessionKey := config.Key + "/" + handshakePayload.SessionID
|
||||
candidateChannel := globalConnectionManager.GetOrCreateCandidateChannel(sessionKey)
|
||||
|
||||
// Set variables
|
||||
hubKey := handshake.HubKey
|
||||
sessionDescription := handshake.SessionDescription
|
||||
hubKey := handshakePayload.HubKey
|
||||
sessionDescription := handshakePayload.SessionDescription
|
||||
|
||||
// Create WebRTC object
|
||||
w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential)
|
||||
@@ -423,12 +502,12 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
// Log ICE connection state changes for diagnostics
|
||||
peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() +
|
||||
" (session: " + handshake.SessionID + ")")
|
||||
" (session: " + handshakePayload.SessionID + ")")
|
||||
})
|
||||
|
||||
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() +
|
||||
" (session: " + handshake.SessionID + ")")
|
||||
" (session: " + handshakePayload.SessionID + ")")
|
||||
|
||||
switch connectionState {
|
||||
case pionWebRTC.PeerConnectionStateDisconnected:
|
||||
@@ -437,9 +516,9 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
wrapper.disconnectMu.Lock()
|
||||
if wrapper.disconnectTimer == nil {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " +
|
||||
disconnectGracePeriod.String() + " for recovery (session: " + handshake.SessionID + ")")
|
||||
disconnectGracePeriod.String() + " for recovery (session: " + handshakePayload.SessionID + ")")
|
||||
wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshake.SessionID + ")")
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshakePayload.SessionID + ")")
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
})
|
||||
}
|
||||
@@ -471,7 +550,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
if wrapper.disconnectTimer != nil {
|
||||
wrapper.disconnectTimer.Stop()
|
||||
wrapper.disconnectTimer = nil
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshake.SessionID + ")")
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshakePayload.SessionID + ")")
|
||||
}
|
||||
wrapper.disconnectMu.Unlock()
|
||||
|
||||
@@ -482,33 +561,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
}
|
||||
})
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID)
|
||||
}()
|
||||
|
||||
// Iterate over the candidates and send them to the remote client
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case candidate, ok := <-candidateChannel:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
|
||||
candidateInit, decodeErr := decodeICECandidate(candidate)
|
||||
if decodeErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
|
||||
continue
|
||||
}
|
||||
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
|
||||
// The other peer will add this candidate by calling AddICECandidate.
|
||||
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
|
||||
@@ -556,27 +608,13 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
candateBinary, err := json.Marshal(candidateJSON)
|
||||
if err == nil {
|
||||
valueMap["candidate"] = string(candateBinary)
|
||||
valueMap["session_id"] = handshake.SessionID
|
||||
valueMap["session_id"] = handshakePayload.SessionID
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
|
||||
} else {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to marshal candidate: " + err.Error())
|
||||
}
|
||||
|
||||
// We'll send the candidate to the hub
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Action: "receive-hd-candidates",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: valueMap,
|
||||
},
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
|
||||
token.Wait()
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
|
||||
}
|
||||
sendCandidateSignal(configuration, mqttClient, hubKey, handshake, candateBinary)
|
||||
})
|
||||
|
||||
offer := w.CreateOffer(sd)
|
||||
@@ -586,6 +624,35 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshakePayload.SessionID)
|
||||
}()
|
||||
|
||||
// Process remote candidates only after the remote description is set.
|
||||
// MQTT can deliver candidates before the SDP offer handling completes,
|
||||
// and Pion rejects AddICECandidate calls until SetRemoteDescription succeeds.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case candidate, ok := <-candidateChannel:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
|
||||
candidateInit, decodeErr := decodeICECandidate(candidate)
|
||||
if decodeErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
|
||||
continue
|
||||
}
|
||||
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
answer, err := peerConnection.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
|
||||
@@ -600,27 +667,9 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
// Store peer connection in manager
|
||||
globalConnectionManager.AddPeerConnection(sessionKey, wrapper)
|
||||
|
||||
// Create a config map
|
||||
valueMap := make(map[string]interface{})
|
||||
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
|
||||
valueMap["session_id"] = handshake.SessionID
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
|
||||
|
||||
// We'll send the candidate to the hub
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Action: "receive-hd-answer",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: valueMap,
|
||||
},
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
|
||||
token.Wait()
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
|
||||
}
|
||||
sendAnswerSignal(configuration, mqttClient, hubKey, handshake, answer)
|
||||
}
|
||||
} else {
|
||||
globalConnectionManager.CloseCandidateChannel(sessionKey)
|
||||
@@ -640,7 +689,12 @@ func NewVideoBroadcaster(streams []packets.Stream) *TrackBroadcaster {
|
||||
}
|
||||
|
||||
func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
|
||||
var audioCodecNames []string
|
||||
hasAAC := false
|
||||
for _, s := range streams {
|
||||
if s.IsAudio {
|
||||
audioCodecNames = append(audioCodecNames, s.Name)
|
||||
}
|
||||
switch s.Name {
|
||||
case "OPUS":
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypeOpus, "audio", trackStreamID)
|
||||
@@ -648,9 +702,18 @@ func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
|
||||
case "PCM_ALAW":
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMA, "audio", trackStreamID)
|
||||
case "AAC":
|
||||
hasAAC = true
|
||||
}
|
||||
}
|
||||
log.Log.Error("webrtc.main.NewAudioBroadcaster(): no supported audio codec found")
|
||||
if hasAAC {
|
||||
log.Log.Info("webrtc.main.NewAudioBroadcaster(): AAC detected, creating PCMU audio track for transcoded output")
|
||||
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
|
||||
} else if len(audioCodecNames) > 0 {
|
||||
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioBroadcaster(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.NewAudioBroadcaster(): no audio stream found in camera feed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -666,18 +729,33 @@ func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample
|
||||
|
||||
func NewAudioTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
|
||||
var mimeType string
|
||||
var audioCodecNames []string
|
||||
hasAAC := false
|
||||
for _, stream := range streams {
|
||||
if stream.IsAudio {
|
||||
audioCodecNames = append(audioCodecNames, stream.Name)
|
||||
}
|
||||
if stream.Name == "OPUS" {
|
||||
mimeType = pionWebRTC.MimeTypeOpus
|
||||
} else if stream.Name == "PCM_MULAW" {
|
||||
mimeType = pionWebRTC.MimeTypePCMU
|
||||
} else if stream.Name == "PCM_ALAW" {
|
||||
mimeType = pionWebRTC.MimeTypePCMA
|
||||
} else if stream.Name == "AAC" {
|
||||
hasAAC = true
|
||||
}
|
||||
}
|
||||
if mimeType == "" {
|
||||
log.Log.Error("webrtc.main.NewAudioTrack(): no supported audio codec found")
|
||||
return nil
|
||||
if hasAAC {
|
||||
mimeType = pionWebRTC.MimeTypePCMU
|
||||
log.Log.Info("webrtc.main.NewAudioTrack(): AAC detected, creating PCMU audio track for transcoded output")
|
||||
} else if len(audioCodecNames) > 0 {
|
||||
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioTrack(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
|
||||
return nil
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.NewAudioTrack(): no audio stream found in camera feed")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
outboundAudioTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", trackStreamID)
|
||||
if err != nil {
|
||||
@@ -696,6 +774,11 @@ type streamState struct {
|
||||
receivedKeyFrame bool
|
||||
lastAudioSample *pionMedia.Sample
|
||||
lastVideoSample *pionMedia.Sample
|
||||
audioPacketsSeen int64
|
||||
aacPacketsSeen int64
|
||||
audioSamplesSent int64
|
||||
aacNoOutput int64
|
||||
aacErrors int64
|
||||
}
|
||||
|
||||
// codecSupport tracks which codecs are available in the stream
|
||||
@@ -843,22 +926,54 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoBroadcaster
|
||||
state.lastVideoSample = &sample
|
||||
}
|
||||
|
||||
// processAudioPacket processes an audio packet and writes samples to the broadcaster
|
||||
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, hasAAC bool) {
|
||||
// processAudioPacket processes an audio packet and writes samples to the broadcaster.
|
||||
// When the packet carries AAC and a transcoder is provided, the audio is transcoded
|
||||
// to G.711 µ-law on the fly so it can be sent over a PCMU WebRTC track.
|
||||
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, transcoder *AACTranscoder) {
|
||||
if audioBroadcaster == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if hasAAC {
|
||||
// AAC transcoding not yet implemented
|
||||
// TODO: Implement AAC to PCM_MULAW transcoding
|
||||
return
|
||||
state.audioPacketsSeen++
|
||||
|
||||
audioData := pkt.Data
|
||||
|
||||
if pkt.Codec == "AAC" {
|
||||
state.aacPacketsSeen++
|
||||
if transcoder == nil {
|
||||
state.aacErrors++
|
||||
if state.aacErrors <= 3 || state.aacErrors%100 == 0 {
|
||||
log.Log.Warning(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet dropped because transcoder is nil (aac_packets=%d, input_bytes=%d)", state.aacPacketsSeen, len(pkt.Data)))
|
||||
}
|
||||
return // no transcoder – silently drop
|
||||
}
|
||||
pcmu, err := transcoder.Transcode(pkt.Data)
|
||||
if err != nil {
|
||||
state.aacErrors++
|
||||
log.Log.Error("webrtc.main.processAudioPacket(): AAC transcode error: " + err.Error())
|
||||
return
|
||||
}
|
||||
if len(pcmu) == 0 {
|
||||
state.aacNoOutput++
|
||||
if state.aacNoOutput <= 5 || state.aacNoOutput%100 == 0 {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet produced no PCMU output yet (aac_packets=%d, no_output=%d, input_bytes=%d)", state.aacPacketsSeen, state.aacNoOutput, len(pkt.Data)))
|
||||
}
|
||||
return // decoder still buffering
|
||||
}
|
||||
if state.aacPacketsSeen <= 5 || state.aacPacketsSeen%100 == 0 {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC transcoded to PCMU (aac_packets=%d, input_bytes=%d, output_bytes=%d, peers=%d)", state.aacPacketsSeen, len(pkt.Data), len(pcmu), audioBroadcaster.PeerCount()))
|
||||
}
|
||||
audioData = pcmu
|
||||
}
|
||||
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
|
||||
sample := pionMedia.Sample{Data: audioData, PacketTimestamp: sampleTimestamp(pkt)}
|
||||
|
||||
if state.lastAudioSample != nil {
|
||||
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
|
||||
state.audioSamplesSent++
|
||||
if state.audioSamplesSent <= 5 || state.audioSamplesSent%100 == 0 {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): queueing audio sample (samples=%d, codec=%s, bytes=%d, duration_ms=%d, peers=%d)", state.audioSamplesSent, pkt.Codec, len(state.lastAudioSample.Data), state.lastAudioSample.Duration.Milliseconds(), audioBroadcaster.PeerCount()))
|
||||
}
|
||||
audioBroadcaster.WriteSample(*state.lastAudioSample)
|
||||
}
|
||||
|
||||
@@ -892,8 +1007,22 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
|
||||
return
|
||||
}
|
||||
|
||||
// Create AAC transcoder if needed (AAC → G.711 µ-law).
|
||||
var aacTranscoder *AACTranscoder
|
||||
if codecs.hasAAC && audioBroadcaster != nil {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): AAC audio detected, creating transcoder (audio_peers=%d)", audioBroadcaster.PeerCount()))
|
||||
t, err := NewAACTranscoder()
|
||||
if err != nil {
|
||||
log.Log.Error("webrtc.main.WriteToTrack(): failed to create AAC transcoder: " + err.Error())
|
||||
} else {
|
||||
aacTranscoder = t
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): AAC transcoder created successfully")
|
||||
defer aacTranscoder.Close()
|
||||
}
|
||||
}
|
||||
|
||||
if config.Capture.TranscodingWebRTC == "true" {
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): transcoding enabled but not yet implemented")
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): transcoding config enabled")
|
||||
}
|
||||
|
||||
// Initialize streaming state
|
||||
@@ -903,6 +1032,12 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
|
||||
}
|
||||
|
||||
defer func() {
|
||||
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): audio summary packets=%d aac_packets=%d sent=%d aac_no_output=%d aac_errors=%d peers=%d", state.audioPacketsSeen, state.aacPacketsSeen, state.audioSamplesSent, state.aacNoOutput, state.aacErrors, func() int {
|
||||
if audioBroadcaster == nil {
|
||||
return 0
|
||||
}
|
||||
return audioBroadcaster.PeerCount()
|
||||
}()))
|
||||
writeFinalSamples(state, videoBroadcaster, audioBroadcaster)
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
|
||||
}()
|
||||
@@ -971,7 +1106,7 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
|
||||
if pkt.IsVideo {
|
||||
processVideoPacket(pkt, state, videoBroadcaster, config)
|
||||
} else if pkt.IsAudio {
|
||||
processAudioPacket(pkt, state, audioBroadcaster, codecs.hasAAC)
|
||||
processAudioPacket(pkt, state, audioBroadcaster, aacTranscoder)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,9 +45,9 @@
|
||||
"crypto": false
|
||||
},
|
||||
"scripts": {
|
||||
"start": "react-scripts start",
|
||||
"build": "GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
|
||||
"test": "react-scripts test",
|
||||
"start": "DISABLE_ESLINT_PLUGIN=true react-scripts start",
|
||||
"build": "DISABLE_ESLINT_PLUGIN=true GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
|
||||
"test": "DISABLE_ESLINT_PLUGIN=true react-scripts test",
|
||||
"eject": "react-scripts eject",
|
||||
"lint": "eslint --debug 'src/**/*.{js,jsx,ts,tsx}'",
|
||||
"format": "prettier --write \"**/*.{js,jsx,json,md}\""
|
||||
|
||||
@@ -237,4 +237,4 @@
|
||||
"remove_after_upload_enabled": "Enable delete on upload"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,7 +194,7 @@ export const getDashboardInformation = (onSuccess, onError) => {
|
||||
};
|
||||
};
|
||||
|
||||
export const getEvents = (eventfilter, onSuccess, onError) => {
|
||||
export const getEvents = (eventfilter, onSuccess, onError, append = false) => {
|
||||
return (dispatch) => {
|
||||
doGetEvents(
|
||||
eventfilter,
|
||||
@@ -203,6 +203,7 @@ export const getEvents = (eventfilter, onSuccess, onError) => {
|
||||
type: 'GET_EVENTS',
|
||||
events: data.events,
|
||||
filter: eventfilter,
|
||||
append,
|
||||
});
|
||||
if (onSuccess) {
|
||||
onSuccess();
|
||||
|
||||
@@ -26,6 +26,23 @@ import {
|
||||
import './Dashboard.scss';
|
||||
import ReactTooltip from 'react-tooltip';
|
||||
import config from '../../config';
|
||||
import { getConfig } from '../../actions/agent';
|
||||
|
||||
function createUUID() {
|
||||
if (
|
||||
typeof window !== 'undefined' &&
|
||||
window.crypto &&
|
||||
typeof window.crypto.randomUUID === 'function'
|
||||
) {
|
||||
return window.crypto.randomUUID();
|
||||
}
|
||||
|
||||
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (char) => {
|
||||
const random = Math.floor(Math.random() * 16);
|
||||
const value = char === 'x' ? random : 8 + Math.floor(random / 4);
|
||||
return value.toString(16);
|
||||
});
|
||||
}
|
||||
|
||||
// eslint-disable-next-line react/prefer-stateless-function
|
||||
class Dashboard extends React.Component {
|
||||
@@ -33,48 +50,55 @@ class Dashboard extends React.Component {
|
||||
super();
|
||||
this.state = {
|
||||
liveviewLoaded: false,
|
||||
liveviewMode: 'webrtc',
|
||||
open: false,
|
||||
currentRecording: '',
|
||||
initialised: false,
|
||||
};
|
||||
this.videoRef = React.createRef();
|
||||
this.pendingRemoteCandidates = [];
|
||||
this.initialiseLiveview = this.initialiseLiveview.bind(this);
|
||||
this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this);
|
||||
this.initialiseSDLiveview = this.initialiseSDLiveview.bind(this);
|
||||
this.startWebRTCLiveview = this.startWebRTCLiveview.bind(this);
|
||||
this.handleWebRTCSignalMessage = this.handleWebRTCSignalMessage.bind(this);
|
||||
this.fallbackToSDLiveview = this.fallbackToSDLiveview.bind(this);
|
||||
}
|
||||
|
||||
componentDidMount() {
|
||||
const liveview = document.getElementsByClassName('videocard-video');
|
||||
if (liveview && liveview.length > 0) {
|
||||
[this.liveviewElement] = liveview;
|
||||
this.liveviewElement.addEventListener('load', this.handleLiveviewLoad);
|
||||
}
|
||||
const { dispatchGetConfig } = this.props;
|
||||
dispatchGetConfig(() => this.initialiseLiveview());
|
||||
this.initialiseLiveview();
|
||||
}
|
||||
|
||||
componentDidUpdate() {
|
||||
this.initialiseLiveview();
|
||||
componentDidUpdate(prevProps) {
|
||||
const { images, dashboard } = this.props;
|
||||
const { liveviewLoaded, liveviewMode } = this.state;
|
||||
const configLoaded = this.hasAgentConfig(this.props);
|
||||
const prevConfigLoaded = this.hasAgentConfig(prevProps);
|
||||
|
||||
if (!prevConfigLoaded && configLoaded) {
|
||||
this.initialiseLiveview();
|
||||
}
|
||||
|
||||
if (
|
||||
liveviewMode === 'sd' &&
|
||||
!liveviewLoaded &&
|
||||
prevProps.images !== images &&
|
||||
images.length > 0
|
||||
) {
|
||||
this.setState({
|
||||
liveviewLoaded: true,
|
||||
});
|
||||
}
|
||||
|
||||
if (!prevProps.dashboard.cameraOnline && dashboard.cameraOnline) {
|
||||
this.initialiseLiveview();
|
||||
}
|
||||
}
|
||||
|
||||
componentWillUnmount() {
|
||||
if (this.liveviewElement) {
|
||||
this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad);
|
||||
this.liveviewElement = null;
|
||||
}
|
||||
|
||||
if (this.requestStreamSubscription) {
|
||||
this.requestStreamSubscription.unsubscribe();
|
||||
this.requestStreamSubscription = null;
|
||||
}
|
||||
const { dispatchSend } = this.props;
|
||||
const message = {
|
||||
message_type: 'stop-sd',
|
||||
};
|
||||
dispatchSend(message);
|
||||
}
|
||||
|
||||
handleLiveviewLoad() {
|
||||
this.setState({
|
||||
liveviewLoaded: true,
|
||||
});
|
||||
this.stopSDLiveview();
|
||||
this.stopWebRTCLiveview();
|
||||
}
|
||||
|
||||
handleClose() {
|
||||
@@ -84,32 +108,378 @@ class Dashboard extends React.Component {
|
||||
});
|
||||
}
|
||||
|
||||
getCurrentTimestamp() {
|
||||
return Math.round(Date.now() / 1000);
|
||||
// eslint-disable-next-line react/sort-comp
|
||||
hasAgentConfig(props) {
|
||||
const currentProps = props || this.props;
|
||||
const { config: configResponse } = currentProps;
|
||||
return !!(configResponse && configResponse.config);
|
||||
}
|
||||
|
||||
browserSupportsWebRTC() {
|
||||
return (
|
||||
typeof window !== 'undefined' &&
|
||||
typeof window.RTCPeerConnection !== 'undefined'
|
||||
);
|
||||
}
|
||||
|
||||
buildPeerConnectionConfig() {
|
||||
const { config: configResponse } = this.props;
|
||||
const agentConfig =
|
||||
configResponse && configResponse.config ? configResponse.config : {};
|
||||
const iceServers = [];
|
||||
|
||||
if (agentConfig.stunuri) {
|
||||
iceServers.push({
|
||||
urls: [agentConfig.stunuri],
|
||||
});
|
||||
}
|
||||
|
||||
if (agentConfig.turnuri) {
|
||||
const turnServer = {
|
||||
urls: [agentConfig.turnuri],
|
||||
};
|
||||
|
||||
if (agentConfig.turn_username) {
|
||||
turnServer.username = agentConfig.turn_username;
|
||||
}
|
||||
|
||||
if (agentConfig.turn_password) {
|
||||
turnServer.credential = agentConfig.turn_password;
|
||||
}
|
||||
|
||||
iceServers.push(turnServer);
|
||||
}
|
||||
|
||||
return {
|
||||
iceServers,
|
||||
iceTransportPolicy: agentConfig.turn_force === 'true' ? 'relay' : 'all',
|
||||
};
|
||||
}
|
||||
|
||||
initialiseLiveview() {
|
||||
const { initialised } = this.state;
|
||||
if (!initialised) {
|
||||
const message = {
|
||||
message_type: 'stream-sd',
|
||||
};
|
||||
const { connected, dispatchSend } = this.props;
|
||||
if (connected) {
|
||||
const { dashboard } = this.props;
|
||||
|
||||
if (initialised || !dashboard.cameraOnline) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.hasAgentConfig()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.browserSupportsWebRTC()) {
|
||||
this.startWebRTCLiveview();
|
||||
} else {
|
||||
this.fallbackToSDLiveview('WebRTC is not supported in this browser.');
|
||||
}
|
||||
}
|
||||
|
||||
initialiseSDLiveview() {
|
||||
if (this.requestStreamSubscription) {
|
||||
return;
|
||||
}
|
||||
|
||||
const message = {
|
||||
message_type: 'stream-sd',
|
||||
};
|
||||
const { connected, dispatchSend } = this.props;
|
||||
if (connected) {
|
||||
dispatchSend(message);
|
||||
}
|
||||
|
||||
const requestStreamInterval = interval(2000);
|
||||
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
|
||||
const { connected: isConnected } = this.props;
|
||||
if (isConnected) {
|
||||
dispatchSend(message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const requestStreamInterval = interval(2000);
|
||||
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
|
||||
const { connected: isConnected } = this.props;
|
||||
if (isConnected) {
|
||||
dispatchSend(message);
|
||||
}
|
||||
});
|
||||
this.setState({
|
||||
initialised: true,
|
||||
});
|
||||
stopSDLiveview() {
|
||||
if (this.requestStreamSubscription) {
|
||||
this.requestStreamSubscription.unsubscribe();
|
||||
this.requestStreamSubscription = null;
|
||||
}
|
||||
|
||||
const { dispatchSend } = this.props;
|
||||
dispatchSend({
|
||||
message_type: 'stop-sd',
|
||||
});
|
||||
}
|
||||
|
||||
stopWebRTCLiveview() {
|
||||
if (this.webrtcTimeout) {
|
||||
window.clearTimeout(this.webrtcTimeout);
|
||||
this.webrtcTimeout = null;
|
||||
}
|
||||
|
||||
if (this.webrtcSocket) {
|
||||
this.webrtcSocket.onopen = null;
|
||||
this.webrtcSocket.onmessage = null;
|
||||
this.webrtcSocket.onerror = null;
|
||||
this.webrtcSocket.onclose = null;
|
||||
this.webrtcSocket.close();
|
||||
this.webrtcSocket = null;
|
||||
}
|
||||
|
||||
if (this.webrtcPeerConnection) {
|
||||
this.webrtcPeerConnection.ontrack = null;
|
||||
this.webrtcPeerConnection.onicecandidate = null;
|
||||
this.webrtcPeerConnection.onconnectionstatechange = null;
|
||||
this.webrtcPeerConnection.close();
|
||||
this.webrtcPeerConnection = null;
|
||||
}
|
||||
|
||||
this.pendingRemoteCandidates = [];
|
||||
this.webrtcOfferStarted = false;
|
||||
this.webrtcSessionId = null;
|
||||
this.webrtcClientId = null;
|
||||
|
||||
if (this.videoRef.current) {
|
||||
this.videoRef.current.srcObject = null;
|
||||
}
|
||||
}
|
||||
|
||||
sendWebRTCMessage(messageType, message = {}) {
|
||||
if (!this.webrtcSocket || this.webrtcSocket.readyState !== WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.webrtcSocket.send(
|
||||
JSON.stringify({
|
||||
client_id: this.webrtcClientId,
|
||||
message_type: messageType,
|
||||
message,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
async handleWebRTCSignalMessage(event) {
|
||||
let data;
|
||||
try {
|
||||
data = JSON.parse(event.data);
|
||||
} catch (error) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { message_type: messageType, message = {} } = data;
|
||||
const { session_id: sessionID, sdp, candidate } = message;
|
||||
|
||||
if (messageType === 'hello-back') {
|
||||
await this.beginWebRTCLiveview();
|
||||
return;
|
||||
}
|
||||
|
||||
if (sessionID && sessionID !== this.webrtcSessionId) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch (messageType) {
|
||||
case 'webrtc-answer':
|
||||
try {
|
||||
await this.webrtcPeerConnection.setRemoteDescription({
|
||||
type: 'answer',
|
||||
sdp: window.atob(sdp),
|
||||
});
|
||||
await this.flushPendingRemoteCandidates();
|
||||
} catch (error) {
|
||||
this.fallbackToSDLiveview(
|
||||
`Unable to apply WebRTC answer: ${error.message}`
|
||||
);
|
||||
}
|
||||
break;
|
||||
|
||||
case 'webrtc-candidate': {
|
||||
try {
|
||||
const candidateInit = JSON.parse(candidate);
|
||||
if (
|
||||
this.webrtcPeerConnection.remoteDescription &&
|
||||
this.webrtcPeerConnection.remoteDescription.type
|
||||
) {
|
||||
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
|
||||
} else {
|
||||
this.pendingRemoteCandidates.push(candidateInit);
|
||||
}
|
||||
} catch (error) {
|
||||
this.fallbackToSDLiveview(
|
||||
`Unable to apply WebRTC candidate: ${error.message}`
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'webrtc-error':
|
||||
this.fallbackToSDLiveview(
|
||||
message.message || 'The agent could not start the WebRTC liveview.'
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async beginWebRTCLiveview() {
|
||||
if (!this.webrtcPeerConnection || this.webrtcOfferStarted) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.webrtcOfferStarted = true;
|
||||
const offer = await this.webrtcPeerConnection.createOffer({
|
||||
offerToReceiveAudio: true,
|
||||
offerToReceiveVideo: true,
|
||||
});
|
||||
await this.webrtcPeerConnection.setLocalDescription(offer);
|
||||
this.sendWebRTCMessage('stream-hd', {
|
||||
session_id: this.webrtcSessionId,
|
||||
sdp: window.btoa(this.webrtcPeerConnection.localDescription.sdp),
|
||||
});
|
||||
} catch (error) {
|
||||
this.fallbackToSDLiveview(
|
||||
`Unable to initialise WebRTC liveview: ${error.message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async flushPendingRemoteCandidates() {
|
||||
if (
|
||||
!this.webrtcPeerConnection ||
|
||||
!this.webrtcPeerConnection.remoteDescription
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (this.pendingRemoteCandidates.length > 0) {
|
||||
const candidateInit = this.pendingRemoteCandidates.shift();
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
|
||||
} catch (error) {
|
||||
this.fallbackToSDLiveview(
|
||||
`Unable to add remote ICE candidate: ${error.message}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startWebRTCLiveview() {
|
||||
if (this.webrtcPeerConnection || this.webrtcSocket) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.stopSDLiveview();
|
||||
|
||||
this.webrtcClientId = createUUID();
|
||||
this.webrtcSessionId = createUUID();
|
||||
this.pendingRemoteCandidates = [];
|
||||
|
||||
this.webrtcPeerConnection = new window.RTCPeerConnection(
|
||||
this.buildPeerConnectionConfig()
|
||||
);
|
||||
|
||||
this.webrtcPeerConnection.ontrack = (event) => {
|
||||
const [remoteStream] = event.streams;
|
||||
if (this.videoRef.current && remoteStream) {
|
||||
this.videoRef.current.srcObject = remoteStream;
|
||||
const playPromise = this.videoRef.current.play();
|
||||
if (playPromise && playPromise.catch) {
|
||||
playPromise.catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
this.setState({
|
||||
liveviewLoaded: true,
|
||||
});
|
||||
};
|
||||
|
||||
this.webrtcPeerConnection.onicecandidate = (event) => {
|
||||
if (!event.candidate) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.sendWebRTCMessage('webrtc-candidate', {
|
||||
session_id: this.webrtcSessionId,
|
||||
candidate: JSON.stringify(event.candidate.toJSON()),
|
||||
});
|
||||
};
|
||||
|
||||
this.webrtcPeerConnection.onconnectionstatechange = () => {
|
||||
const { connectionState } = this.webrtcPeerConnection;
|
||||
if (connectionState === 'connected') {
|
||||
this.setState({
|
||||
liveviewLoaded: true,
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
connectionState === 'failed' ||
|
||||
connectionState === 'disconnected' ||
|
||||
connectionState === 'closed'
|
||||
) {
|
||||
this.fallbackToSDLiveview(
|
||||
`WebRTC connection ${connectionState}, falling back to SD liveview.`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtcSocket = new WebSocket(config.WS_URL);
|
||||
this.webrtcSocket.onopen = () => {
|
||||
this.sendWebRTCMessage('hello', {});
|
||||
};
|
||||
this.webrtcSocket.onmessage = this.handleWebRTCSignalMessage;
|
||||
this.webrtcSocket.onerror = () => {
|
||||
this.fallbackToSDLiveview('Unable to open the WebRTC signaling channel.');
|
||||
};
|
||||
this.webrtcSocket.onclose = () => {
|
||||
const { liveviewLoaded } = this.state;
|
||||
if (!liveviewLoaded) {
|
||||
this.fallbackToSDLiveview('WebRTC signaling channel closed early.');
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtcTimeout = window.setTimeout(() => {
|
||||
const { liveviewLoaded } = this.state;
|
||||
if (!liveviewLoaded) {
|
||||
this.fallbackToSDLiveview(
|
||||
'WebRTC connection timed out, falling back to SD liveview.'
|
||||
);
|
||||
}
|
||||
}, 10000);
|
||||
|
||||
this.setState({
|
||||
initialised: true,
|
||||
liveviewLoaded: false,
|
||||
liveviewMode: 'webrtc',
|
||||
});
|
||||
}
|
||||
|
||||
fallbackToSDLiveview(errorMessage) {
|
||||
const { liveviewMode } = this.state;
|
||||
|
||||
if (liveviewMode === 'sd' && this.requestStreamSubscription) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.stopWebRTCLiveview();
|
||||
if (errorMessage) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn(errorMessage);
|
||||
}
|
||||
|
||||
this.setState(
|
||||
{
|
||||
initialised: true,
|
||||
liveviewLoaded: false,
|
||||
liveviewMode: 'sd',
|
||||
},
|
||||
() => {
|
||||
this.initialiseSDLiveview();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
openModal(file) {
|
||||
@@ -121,7 +491,8 @@ class Dashboard extends React.Component {
|
||||
|
||||
render() {
|
||||
const { dashboard, t, images } = this.props;
|
||||
const { liveviewLoaded, open, currentRecording } = this.state;
|
||||
const { liveviewLoaded, liveviewMode, open, currentRecording } = this.state;
|
||||
const listenerCount = dashboard.webrtcReaders ? dashboard.webrtcReaders : 0;
|
||||
|
||||
// We check if the camera was getting a valid frame
|
||||
// during the last 5 seconds, otherwise we assume the camera is offline.
|
||||
@@ -175,7 +546,6 @@ class Dashboard extends React.Component {
|
||||
divider="0"
|
||||
footer={t('dashboard.total_recordings')}
|
||||
/>
|
||||
|
||||
<Link to="/settings">
|
||||
<Card
|
||||
title="IP Camera"
|
||||
@@ -314,7 +684,9 @@ class Dashboard extends React.Component {
|
||||
)}
|
||||
</div>
|
||||
<div>
|
||||
<h2>{t('dashboard.live_view')}</h2>
|
||||
<h2>
|
||||
{t('dashboard.live_view')} ({listenerCount})
|
||||
</h2>
|
||||
{(!liveviewLoaded || !isCameraOnline) && (
|
||||
<SetupBox
|
||||
btnicon="preferences"
|
||||
@@ -331,12 +703,16 @@ class Dashboard extends React.Component {
|
||||
liveviewLoaded && isCameraOnline ? 'visible' : 'hidden',
|
||||
}}
|
||||
>
|
||||
<ImageCard
|
||||
imageSrc={`data:image/png;base64, ${
|
||||
images.length ? images[0] : ''
|
||||
}`}
|
||||
onerror=""
|
||||
/>
|
||||
{liveviewMode === 'webrtc' ? (
|
||||
<video ref={this.videoRef} autoPlay muted playsInline />
|
||||
) : (
|
||||
<ImageCard
|
||||
imageSrc={`data:image/png;base64, ${
|
||||
images.length ? images[0] : ''
|
||||
}`}
|
||||
onerror=""
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -348,20 +724,25 @@ class Dashboard extends React.Component {
|
||||
|
||||
const mapStateToProps = (state /* , ownProps */) => ({
|
||||
dashboard: state.agent.dashboard,
|
||||
config: state.agent.config,
|
||||
connected: state.wss.connected,
|
||||
images: state.wss.images,
|
||||
});
|
||||
|
||||
const mapDispatchToProps = (dispatch) => ({
|
||||
dispatchSend: (message) => dispatch(send(message)),
|
||||
dispatchGetConfig: (onSuccess, onError) =>
|
||||
dispatch(getConfig(onSuccess, onError)),
|
||||
});
|
||||
|
||||
Dashboard.propTypes = {
|
||||
dashboard: PropTypes.object.isRequired,
|
||||
config: PropTypes.object.isRequired,
|
||||
connected: PropTypes.bool.isRequired,
|
||||
images: PropTypes.array.isRequired,
|
||||
t: PropTypes.func.isRequired,
|
||||
dispatchSend: PropTypes.func.isRequired,
|
||||
dispatchGetConfig: PropTypes.func.isRequired,
|
||||
};
|
||||
|
||||
export default withTranslation()(
|
||||
|
||||
@@ -3,6 +3,7 @@ import PropTypes from 'prop-types';
|
||||
import { withTranslation } from 'react-i18next';
|
||||
import {
|
||||
Breadcrumb,
|
||||
ControlBar,
|
||||
VideoCard,
|
||||
Button,
|
||||
Modal,
|
||||
@@ -16,14 +17,53 @@ import { getEvents } from '../../actions/agent';
|
||||
import config from '../../config';
|
||||
import './Media.scss';
|
||||
|
||||
function formatDateTimeLocal(date) {
|
||||
const year = date.getFullYear();
|
||||
const month = String(date.getMonth() + 1).padStart(2, '0');
|
||||
const day = String(date.getDate()).padStart(2, '0');
|
||||
const hours = String(date.getHours()).padStart(2, '0');
|
||||
const minutes = String(date.getMinutes()).padStart(2, '0');
|
||||
|
||||
return `${year}-${month}-${day}T${hours}:${minutes}`;
|
||||
}
|
||||
|
||||
function getDefaultTimeWindow() {
|
||||
const endDate = new Date();
|
||||
const startDate = new Date(endDate.getTime() - 60 * 60 * 1000);
|
||||
|
||||
return {
|
||||
startDateTime: formatDateTimeLocal(startDate),
|
||||
endDateTime: formatDateTimeLocal(endDate),
|
||||
timestamp_offset_start: Math.floor(startDate.getTime() / 1000),
|
||||
timestamp_offset_end: Math.floor(endDate.getTime() / 1000) + 59,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeInputValue(valueOrEvent) {
|
||||
if (valueOrEvent && valueOrEvent.target) {
|
||||
return valueOrEvent.target.value;
|
||||
}
|
||||
|
||||
return valueOrEvent;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line react/prefer-stateless-function
|
||||
class Media extends React.Component {
|
||||
constructor() {
|
||||
super();
|
||||
this.state = {
|
||||
timestamp_offset_start: 0,
|
||||
timestamp_offset_end: 0,
|
||||
|
||||
const defaultTimeWindow = getDefaultTimeWindow();
|
||||
|
||||
const initialFilter = {
|
||||
timestamp_offset_start: defaultTimeWindow.timestamp_offset_start,
|
||||
timestamp_offset_end: defaultTimeWindow.timestamp_offset_end,
|
||||
number_of_elements: 12,
|
||||
};
|
||||
|
||||
this.state = {
|
||||
appliedFilter: initialFilter,
|
||||
startDateTime: defaultTimeWindow.startDateTime,
|
||||
endDateTime: defaultTimeWindow.endDateTime,
|
||||
isScrolling: false,
|
||||
open: false,
|
||||
currentRecording: '',
|
||||
@@ -32,7 +72,8 @@ class Media extends React.Component {
|
||||
|
||||
componentDidMount() {
|
||||
const { dispatchGetEvents } = this.props;
|
||||
dispatchGetEvents(this.state);
|
||||
const { appliedFilter } = this.state;
|
||||
dispatchGetEvents(appliedFilter);
|
||||
document.addEventListener('scroll', this.trackScrolling);
|
||||
}
|
||||
|
||||
@@ -49,29 +90,107 @@ class Media extends React.Component {
|
||||
|
||||
trackScrolling = () => {
|
||||
const { events, dispatchGetEvents } = this.props;
|
||||
const { isScrolling } = this.state;
|
||||
const { isScrolling, appliedFilter } = this.state;
|
||||
const wrappedElement = document.getElementById('loader');
|
||||
if (!isScrolling && this.isBottom(wrappedElement)) {
|
||||
this.setState({
|
||||
isScrolling: true,
|
||||
});
|
||||
// Get last element
|
||||
const lastElement = events[events.length - 1];
|
||||
if (lastElement) {
|
||||
this.setState({
|
||||
if (!wrappedElement || isScrolling || !this.isBottom(wrappedElement)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.setState({
|
||||
isScrolling: true,
|
||||
});
|
||||
|
||||
// Get last element
|
||||
const lastElement = events[events.length - 1];
|
||||
if (lastElement) {
|
||||
dispatchGetEvents(
|
||||
{
|
||||
...appliedFilter,
|
||||
timestamp_offset_end: parseInt(lastElement.timestamp, 10),
|
||||
});
|
||||
dispatchGetEvents(this.state, () => {
|
||||
},
|
||||
() => {
|
||||
setTimeout(() => {
|
||||
this.setState({
|
||||
isScrolling: false,
|
||||
});
|
||||
}, 1000);
|
||||
});
|
||||
}
|
||||
},
|
||||
() => {
|
||||
this.setState({
|
||||
isScrolling: false,
|
||||
});
|
||||
},
|
||||
true
|
||||
);
|
||||
} else {
|
||||
this.setState({
|
||||
isScrolling: false,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
buildEventFilter(startDateTime, endDateTime) {
|
||||
const { appliedFilter } = this.state;
|
||||
|
||||
return {
|
||||
timestamp_offset_start: this.getTimestampFromInput(
|
||||
startDateTime,
|
||||
'start'
|
||||
),
|
||||
timestamp_offset_end: this.getTimestampFromInput(endDateTime, 'end'),
|
||||
number_of_elements: appliedFilter.number_of_elements,
|
||||
};
|
||||
}
|
||||
|
||||
handleDateFilterChange(field, value) {
|
||||
const { dispatchGetEvents } = this.props;
|
||||
const { startDateTime, endDateTime } = this.state;
|
||||
const normalizedValue = normalizeInputValue(value);
|
||||
const nextStartDateTime =
|
||||
field === 'startDateTime' ? normalizedValue : startDateTime;
|
||||
const nextEndDateTime =
|
||||
field === 'endDateTime' ? normalizedValue : endDateTime;
|
||||
const nextFilter = this.buildEventFilter(
|
||||
nextStartDateTime,
|
||||
nextEndDateTime
|
||||
);
|
||||
const shouldApplyFilter =
|
||||
(nextStartDateTime === '' || nextStartDateTime.length === 16) &&
|
||||
(nextEndDateTime === '' || nextEndDateTime.length === 16);
|
||||
|
||||
this.setState(
|
||||
{
|
||||
[field]: normalizedValue,
|
||||
appliedFilter: shouldApplyFilter
|
||||
? nextFilter
|
||||
: this.state.appliedFilter,
|
||||
isScrolling: false,
|
||||
},
|
||||
() => {
|
||||
if (shouldApplyFilter) {
|
||||
dispatchGetEvents(nextFilter);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
getTimestampFromInput(value, boundary) {
|
||||
if (!value) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const date = new Date(value);
|
||||
if (Number.isNaN(date.getTime())) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const seconds = Math.floor(date.getTime() / 1000);
|
||||
if (boundary === 'end') {
|
||||
return seconds + 59;
|
||||
}
|
||||
return seconds;
|
||||
}
|
||||
|
||||
isBottom(el) {
|
||||
return el.getBoundingClientRect().bottom + 50 <= window.innerHeight;
|
||||
}
|
||||
@@ -85,7 +204,9 @@ class Media extends React.Component {
|
||||
|
||||
render() {
|
||||
const { events, eventsLoaded, t } = this.props;
|
||||
const { isScrolling, open, currentRecording } = this.state;
|
||||
const { isScrolling, open, currentRecording, startDateTime, endDateTime } =
|
||||
this.state;
|
||||
|
||||
return (
|
||||
<div id="media">
|
||||
<Breadcrumb
|
||||
@@ -102,6 +223,37 @@ class Media extends React.Component {
|
||||
</Link>
|
||||
</Breadcrumb>
|
||||
|
||||
<div className="media-control-bar">
|
||||
<ControlBar>
|
||||
<div className="media-filters">
|
||||
<div className="media-filters__field">
|
||||
<label htmlFor="recordings-start-time">Start time</label>
|
||||
<input
|
||||
className="media-filters__input"
|
||||
id="recordings-start-time"
|
||||
type="datetime-local"
|
||||
value={startDateTime}
|
||||
onChange={(value) =>
|
||||
this.handleDateFilterChange('startDateTime', value)
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
<div className="media-filters__field">
|
||||
<label htmlFor="recordings-end-time">End time</label>
|
||||
<input
|
||||
className="media-filters__input"
|
||||
id="recordings-end-time"
|
||||
type="datetime-local"
|
||||
value={endDateTime}
|
||||
onChange={(value) =>
|
||||
this.handleDateFilterChange('endDateTime', value)
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</ControlBar>
|
||||
</div>
|
||||
|
||||
<div className="stats grid-container --four-columns">
|
||||
{events.map((event) => (
|
||||
<div
|
||||
@@ -123,6 +275,11 @@ class Media extends React.Component {
|
||||
</div>
|
||||
))}
|
||||
</div>
|
||||
{events.length === 0 && eventsLoaded === 0 && (
|
||||
<div className="media-empty-state">
|
||||
No recordings found in the selected time range.
|
||||
</div>
|
||||
)}
|
||||
{open && (
|
||||
<Modal>
|
||||
<ModalHeader
|
||||
@@ -182,13 +339,13 @@ const mapStateToProps = (state /* , ownProps */) => ({
|
||||
});
|
||||
|
||||
const mapDispatchToProps = (dispatch) => ({
|
||||
dispatchGetEvents: (eventFilter, success, error) =>
|
||||
dispatch(getEvents(eventFilter, success, error)),
|
||||
dispatchGetEvents: (eventFilter, success, error, append) =>
|
||||
dispatch(getEvents(eventFilter, success, error, append)),
|
||||
});
|
||||
|
||||
Media.propTypes = {
|
||||
t: PropTypes.func.isRequired,
|
||||
events: PropTypes.objectOf(PropTypes.object).isRequired,
|
||||
events: PropTypes.arrayOf(PropTypes.object).isRequired,
|
||||
eventsLoaded: PropTypes.number.isRequired,
|
||||
dispatchGetEvents: PropTypes.func.isRequired,
|
||||
};
|
||||
|
||||
@@ -1,4 +1,103 @@
|
||||
#media {
|
||||
.media-control-bar {
|
||||
.control-bar {
|
||||
display: block;
|
||||
padding: 0 var(--main-content-gutter);
|
||||
}
|
||||
|
||||
.control-bar .filtering {
|
||||
display: block;
|
||||
}
|
||||
|
||||
.control-bar .filtering > * {
|
||||
border-right: 0;
|
||||
flex: 1 1 100% !important;
|
||||
max-width: none;
|
||||
width: 100%;
|
||||
}
|
||||
}
|
||||
|
||||
.media-filters {
|
||||
align-items: stretch;
|
||||
display: grid;
|
||||
gap: 0;
|
||||
grid-template-columns: repeat(2, minmax(0, 1fr));
|
||||
min-width: 0;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.media-filters__field {
|
||||
border-right: 1px solid var(--bg-muted);
|
||||
min-width: 0;
|
||||
padding: 16px 24px;
|
||||
|
||||
label {
|
||||
display: block;
|
||||
font-size: 14px;
|
||||
font-weight: 600;
|
||||
margin-bottom: 8px;
|
||||
white-space: nowrap;
|
||||
}
|
||||
}
|
||||
|
||||
.media-filters__field:first-child {
|
||||
padding-left: 0;
|
||||
}
|
||||
|
||||
.media-filters__input {
|
||||
appearance: none;
|
||||
background: var(--white);
|
||||
border: 1px solid var(--grey-light);
|
||||
border-radius: 8px;
|
||||
box-sizing: border-box;
|
||||
color: var(--black);
|
||||
font-size: 16px;
|
||||
min-height: 48px;
|
||||
padding: 0 14px 0 0;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.media-filters__input::-webkit-datetime-edit,
|
||||
.media-filters__input::-webkit-datetime-edit-fields-wrapper {
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
.media-filters__input:focus {
|
||||
border-color: var(--oss);
|
||||
outline: 0;
|
||||
}
|
||||
|
||||
.media-filters__field:first-child .media-filters__input {
|
||||
padding-left: 0;
|
||||
}
|
||||
|
||||
.media-filters__field:last-child {
|
||||
border-right: 0;
|
||||
padding-right: 0;
|
||||
}
|
||||
|
||||
@media (max-width: 700px) {
|
||||
.media-filters {
|
||||
grid-template-columns: 1fr;
|
||||
}
|
||||
|
||||
.media-filters__field {
|
||||
border-right: 0;
|
||||
border-bottom: 1px solid var(--bg-muted);
|
||||
padding-left: 0;
|
||||
padding-right: 0;
|
||||
}
|
||||
|
||||
.media-filters__field:last-child {
|
||||
border-bottom: 0;
|
||||
}
|
||||
}
|
||||
|
||||
.media-empty-state {
|
||||
margin: 24px 0;
|
||||
opacity: 0.8;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
#loader {
|
||||
display: flex;
|
||||
|
||||
@@ -123,16 +123,12 @@ const agent = (
|
||||
};
|
||||
|
||||
case 'GET_EVENTS':
|
||||
const { timestamp_offset_end } = action.filter;
|
||||
const { events } = action;
|
||||
return {
|
||||
...state,
|
||||
eventsLoaded: events.length,
|
||||
events:
|
||||
timestamp_offset_end === 0
|
||||
? [...events]
|
||||
: [...state.events, ...events],
|
||||
eventfilter: action.eventfilter,
|
||||
events: action.append ? [...state.events, ...events] : [...events],
|
||||
eventfilter: action.filter,
|
||||
};
|
||||
|
||||
default:
|
||||
|
||||
Reference in New Issue
Block a user