Compare commits

...

5 Commits

Author SHA1 Message Date
Cedric Verstraeten
9cf9babd73 introduce camera connected variable + allow MQTT stream to be connected even when no camera attached 2023-06-09 14:44:09 +02:00
Cedric Verstraeten
229c246e1c adding ctx (support) + unblock when unsupported codec 2023-06-08 21:50:10 +02:00
Cedric Verstraeten
15d9bcda4f decoding issue caused new mongodb adapter to fail 2023-06-07 22:01:44 +02:00
Cedric Verstraeten
068063695e time table might be empty 2023-06-07 20:24:09 +02:00
Cedric Verstraeten
b1722844f3 fix config overriding 2023-06-07 20:19:50 +02:00
12 changed files with 232 additions and 118 deletions

View File

@@ -2,6 +2,9 @@ module github.com/kerberos-io/agent/machinery
go 1.19
// replace github.com/kerberos-io/joy4 v1.0.57 => ../../../../github.com/kerberos-io/joy4
// replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif
require (
github.com/InVisionApp/conjungo v1.1.0
github.com/appleboy/gin-jwt/v2 v2.9.1
@@ -21,7 +24,7 @@ require (
github.com/golang-module/carbon/v2 v2.2.3
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.57
github.com/kerberos-io/joy4 v1.0.58
github.com/kerberos-io/onvif v0.0.5
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e

View File

@@ -264,8 +264,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.57 h1:/8epNAJv4cOzBG8pFiM9hVNXfwsgA+8/2nHQ2yOeyII=
github.com/kerberos-io/joy4 v1.0.57/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/joy4 v1.0.58 h1:R8EECSF+bG7o2yHC6cX/lF77Z+bDVGl6OioLZ3+5MN4=
github.com/kerberos-io/joy4 v1.0.58/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"os"
"time"
@@ -16,7 +17,6 @@ import (
var VERSION = "3.0.0"
func main() {
// You might be interested in debugging the agent.
if os.Getenv("DATADOG_AGENT_ENABLED") == "true" {
if os.Getenv("DATADOG_AGENT_K8S_ENABLED") == "true" {
@@ -111,8 +111,14 @@ func main() {
}
}
// Create a cancelable context, which will be used to cancel and restart.
// This is used to restart the agent when the configuration is updated.
ctx, cancel := context.WithCancel(context.Background())
// Bootstrapping the agent
communication := models.Communication{
Context: &ctx,
CancelContext: &cancel,
HandleBootstrap: make(chan string, 1),
}
go components.Bootstrap(&configuration, &communication)

View File

@@ -1,6 +1,7 @@
package capture
import (
"context"
"strconv"
"sync"
"time"
@@ -15,9 +16,9 @@ import (
"github.com/kerberos-io/joy4/format"
)
func OpenRTSP(url string) (av.DemuxCloser, []av.CodecData, error) {
func OpenRTSP(ctx context.Context, url string) (av.DemuxCloser, []av.CodecData, error) {
format.RegisterAll()
infile, err := avutil.Open(url)
infile, err := avutil.Open(ctx, url)
if err == nil {
streams, errstreams := infile.Streams()
return infile, streams, errstreams

View File

@@ -2,6 +2,7 @@
package capture
import (
"context"
"os"
"strconv"
"time"
@@ -431,6 +432,10 @@ func VerifyCamera(c *gin.Context) {
var cameraStreams models.CameraStreams
err := c.BindJSON(&cameraStreams)
// Should return in 5 seconds.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err == nil {
streamType := c.Param("streamType")
@@ -442,7 +447,7 @@ func VerifyCamera(c *gin.Context) {
if streamType == "secondary" {
rtspUrl = cameraStreams.SubRTSP
}
_, codecs, err := OpenRTSP(rtspUrl)
_, codecs, err := OpenRTSP(ctx, rtspUrl)
if err == nil {
videoIdx := -1

View File

@@ -83,17 +83,25 @@ func OpenConfig(configuration *models.Configuration) {
db := client.Database(database.DatabaseName)
collection := db.Collection("configuration")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
collection.FindOne(ctx, bson.M{
var globalConfig models.Config
err := collection.FindOne(context.Background(), bson.M{
"type": "global",
}).Decode(&configuration.GlobalConfig)
}).Decode(&globalConfig)
if err != nil {
log.Log.Error("Could not find global configuration, using default configuration.")
}
configuration.GlobalConfig = globalConfig
collection.FindOne(ctx, bson.M{
var customConfig models.Config
deploymentName := os.Getenv("DEPLOYMENT_NAME")
err = collection.FindOne(context.Background(), bson.M{
"type": "config",
"name": os.Getenv("DEPLOYMENT_NAME"),
}).Decode(&configuration.CustomConfig)
"name": deploymentName,
}).Decode(&customConfig)
if err != nil {
log.Log.Error("Could not find configuration for " + deploymentName + ", using global configuration.")
}
configuration.CustomConfig = customConfig
// We will merge both configs in a single config file.
// Read again from database but this store overwrite the same object.
@@ -209,8 +217,7 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
/* ONVIF connnection settings */
case "AGENT_CAPTURE_IPCAMERA_ONVIF":
isEnabled := value == " true"
configuration.Config.Capture.IPCamera.ONVIF = isEnabled
configuration.Config.Capture.IPCamera.ONVIF = value
break
case "AGENT_CAPTURE_IPCAMERA_ONVIF_XADDR":
configuration.Config.Capture.IPCamera.ONVIFXAddr = value
@@ -440,9 +447,11 @@ func SaveConfig(config models.Config, configuration *models.Configuration, commu
return err
}
select {
case communication.HandleBootstrap <- "restart":
default:
if communication.CameraConnected {
select {
case communication.HandleBootstrap <- "restart":
default:
}
}
communication.IsConfiguring.UnSet()

View File

@@ -1,12 +1,14 @@
package components
import (
"context"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/kerberos-io/joy4/cgo/ffmpeg"
"github.com/kerberos-io/agent/machinery/src/capture"
@@ -53,6 +55,7 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
communication.HandleLiveSD = make(chan int64, 1)
communication.HandleLiveHDKeepalive = make(chan string, 1)
communication.HandleLiveHDPeers = make(chan string, 1)
communication.HandleONVIF = make(chan models.OnvifAction, 1)
communication.IsConfiguring = abool.New()
// Before starting the agent, we have a control goroutine, that might
@@ -67,33 +70,71 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
// Handle heartbeats
go cloud.HandleHeartBeat(configuration, communication, uptimeStart)
// We'll create a MQTT handler, which will be used to communicate with Kerberos Hub.
// Configure a MQTT client which helps for a bi-directional communication
mqttClient := routers.ConfigureMQTT(configuration, communication)
// Run the agent and fire up all the other
// goroutines which do image capture, motion detection, onvif, etc.
for {
// This will blocking until receiving a signal to be restarted, reconfigured, stopped, etc.
status := RunAgent(configuration, communication, uptimeStart, cameraSettings, decoder, subDecoder)
status := RunAgent(configuration, communication, mqttClient, uptimeStart, cameraSettings, decoder, subDecoder)
if status == "stop" {
break
}
// We will reconfigure or restart the agent, we will mark the agent as not connected.
communication.CameraConnected = false
// We will re open the configuration, might have changed :O!
OpenConfig(configuration)
// We will override the configuration with the environment variables
OverrideWithEnvironmentVariables(configuration)
// Reset the MQTT client, might have provided new information, so we need to reconnect.
if routers.HasMQTTClientModified(configuration) {
routers.DisconnectMQTT(mqttClient, &configuration.Config)
mqttClient = routers.ConfigureMQTT(configuration, communication)
}
// We will create a new cancelable context, which will be used to cancel and restart.
// This is used to restart the agent when the configuration is updated.
ctx, cancel := context.WithCancel(context.Background())
communication.Context = &ctx
communication.CancelContext = &cancel
}
log.Log.Debug("Bootstrap: finished")
}
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time, cameraSettings *models.Camera, decoder *ffmpeg.VideoDecoder, subDecoder *ffmpeg.VideoDecoder) string {
func RunAgent(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, uptimeStart time.Time, cameraSettings *models.Camera, decoder *ffmpeg.VideoDecoder, subDecoder *ffmpeg.VideoDecoder) string {
log.Log.Debug("RunAgent: bootstrapping agent")
config := configuration.Config
status := "not started"
// Currently only support H264 encoded cameras, this will change.
// Establishing the camera connection
rtspUrl := config.Capture.IPCamera.RTSP
infile, streams, err := capture.OpenRTSP(rtspUrl)
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl)
// We will initialise the camera settings object
// so we can check if the camera settings have changed, and we need
// to reload the decoders.
videoStream, _ := capture.GetVideoStream(streams)
if videoStream == nil {
log.Log.Error("RunAgent: no video stream found, might be the wrong codec (we only support H264 for the moment)")
time.Sleep(time.Second * 3)
return status
}
num, denum := videoStream.(av.VideoCodecData).Framerate()
width := videoStream.(av.VideoCodecData).Width()
height := videoStream.(av.VideoCodecData).Height()
var queue *pubsub.Queue
var subQueue *pubsub.Queue
@@ -101,11 +142,9 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
var decoderMutex sync.Mutex
var subDecoderMutex sync.Mutex
status := "not started"
if err == nil {
log.Log.Info("RunAgent: opened RTSP stream" + rtspUrl)
log.Log.Info("RunAgent: opened RTSP stream: " + rtspUrl)
// We might have a secondary rtsp url, so we might need to use that.
var subInfile av.DemuxCloser
@@ -113,23 +152,21 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
subStreamEnabled := false
subRtspUrl := config.Capture.IPCamera.SubRTSP
if subRtspUrl != "" && subRtspUrl != rtspUrl {
subInfile, subStreams, err = capture.OpenRTSP(subRtspUrl)
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl)
if err == nil {
log.Log.Info("RunAgent: opened RTSP sub stream " + subRtspUrl)
subStreamEnabled = true
}
videoStream, _ := capture.GetVideoStream(subStreams)
if videoStream == nil {
log.Log.Error("RunAgent: no video substream found, might be the wrong codec (we only support H264 for the moment)")
time.Sleep(time.Second * 3)
return status
}
}
// We will initialise the camera settings object
// so we can check if the camera settings have changed, and we need
// to reload the decoders.
videoStream, _ := capture.GetVideoStream(streams)
num, denum := videoStream.(av.VideoCodecData).Framerate()
width := videoStream.(av.VideoCodecData).Width()
height := videoStream.(av.VideoCodecData).Height()
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
if cameraSettings.Initialized {
decoder.Close()
if subStreamEnabled {
@@ -187,9 +224,8 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
subQueue.WriteHeader(subStreams)
}
// Configure a MQTT client which helps for a bi-directional communication
communication.HandleONVIF = make(chan models.OnvifAction, 1)
mqttClient := routers.ConfigureMQTT(configuration, communication)
// If we reach this point, we have a working RTSP connection.
communication.CameraConnected = true
// Handle the camera stream
go capture.HandleStream(infile, queue, communication)
@@ -240,8 +276,12 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// This will go into a blocking state, once this channel is triggered
// the agent will cleanup and restart.
status = <-communication.HandleBootstrap
// Cancel the main context, this will stop all the other goroutines.
(*communication.CancelContext)()
// Here we are cleaning up everything!
if configuration.Config.Offline != "true" {
communication.HandleUpload <- "stop"
@@ -265,16 +305,9 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
subQueue = nil
communication.SubQueue = nil
}
close(communication.HandleONVIF)
communication.HandleONVIF = nil
close(communication.HandleLiveHDHandshake)
communication.HandleLiveHDHandshake = nil
close(communication.HandleMotion)
communication.HandleMotion = nil
// Disconnect MQTT
routers.DisconnectMQTT(mqttClient, &configuration.Config)
// Wait a few seconds to stop the decoder.
time.Sleep(time.Second * 3)
@@ -303,29 +336,32 @@ func ControlAgent(communication *models.Communication) {
var previousPacket int64 = 0
var occurence = 0
for {
packetsR := packageCounter.Load().(int64)
if packetsR == previousPacket {
// If we are already reconfiguring,
// we dont need to check if the stream is blocking.
if !communication.IsConfiguring.IsSet() {
occurence = occurence + 1
// If camera is connected, we'll check if we are still receiving packets.
if communication.CameraConnected {
packetsR := packageCounter.Load().(int64)
if packetsR == previousPacket {
// If we are already reconfiguring,
// we dont need to check if the stream is blocking.
if !communication.IsConfiguring.IsSet() {
occurence = occurence + 1
}
} else {
occurence = 0
}
} else {
occurence = 0
log.Log.Info("ControlAgent: Number of packets read " + strconv.FormatInt(packetsR, 10))
// After 15 seconds without activity this is thrown..
if occurence == 3 {
log.Log.Info("Main: Restarting machinery.")
communication.HandleBootstrap <- "restart"
time.Sleep(2 * time.Second)
occurence = 0
}
previousPacket = packageCounter.Load().(int64)
}
log.Log.Info("ControlAgent: Number of packets read " + strconv.FormatInt(packetsR, 10))
// After 15 seconds without activity this is thrown..
if occurence == 3 {
log.Log.Info("Main: Restarting machinery.")
communication.HandleBootstrap <- "restart"
time.Sleep(2 * time.Second)
occurence = 0
}
previousPacket = packageCounter.Load().(int64)
time.Sleep(5 * time.Second)
}
}()

View File

@@ -10,15 +10,12 @@ import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
geo "github.com/kellydunn/golang-geo"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/joy4/av/pubsub"
//"github.com/whorfin/go-libjpeg/jpeg"
geo "github.com/kellydunn/golang-geo"
"github.com/kerberos-io/joy4/av"
"github.com/kerberos-io/joy4/av/pubsub"
"github.com/kerberos-io/joy4/cgo/ffmpeg"
)

View File

@@ -1,6 +1,7 @@
package models
import (
"context"
"sync"
"sync/atomic"
@@ -12,6 +13,8 @@ import (
// The communication struct that is managing
// all the communication between the different goroutines.
type Communication struct {
Context *context.Context
CancelContext *context.CancelFunc
PackageCounter *atomic.Value
LastPacketTimer *atomic.Value
CloudTimestamp *atomic.Value

View File

@@ -73,7 +73,7 @@ type IPCamera struct {
RTSP string `json:"rtsp"`
SubRTSP string `json:"sub_rtsp"`
FPS string `json:"fps"`
ONVIF bool `json:"onvif,omitempty" bson:"onvif"`
ONVIF string `json:"onvif,omitempty" bson:"onvif"`
ONVIFXAddr string `json:"onvif_xaddr,omitempty" bson:"onvif_xaddr"`
ONVIFUsername string `json:"onvif_username,omitempty" bson:"onvif_username"`
ONVIFPassword string `json:"onvif_password,omitempty" bson:"onvif_password"`

View File

@@ -13,10 +13,38 @@ import (
"github.com/kerberos-io/agent/machinery/src/webrtc"
)
// We'll cache the MQTT settings to know if we need to reinitialize the MQTT client connection.
// If we update the configuration but no new MQTT settings are provided, we don't need to restart it.
var PREV_MQTTURI string
var PREV_MQTTUsername string
var PREV_MQTTPassword string
var PREV_HubKey string
var PREV_AgentKey string
func HasMQTTClientModified(configuration *models.Configuration) bool {
MTTURI := configuration.Config.MQTTURI
MTTUsername := configuration.Config.MQTTUsername
MQTTPassword := configuration.Config.MQTTPassword
HubKey := configuration.Config.HubKey
AgentKey := configuration.Config.Key
if PREV_MQTTURI != MTTURI || PREV_MQTTUsername != MTTUsername || PREV_MQTTPassword != MQTTPassword || PREV_HubKey != HubKey || PREV_AgentKey != AgentKey {
log.Log.Info("HasMQTTClientModified: MQTT settings have been modified, restarting MQTT client.")
return true
}
return false
}
func ConfigureMQTT(configuration *models.Configuration, communication *models.Communication) mqtt.Client {
config := configuration.Config
// Set the MQTT settings.
PREV_MQTTURI = configuration.Config.MQTTURI
PREV_MQTTUsername = configuration.Config.MQTTUsername
PREV_MQTTPassword = configuration.Config.MQTTPassword
PREV_HubKey = configuration.Config.HubKey
PREV_AgentKey = configuration.Config.Key
if config.Offline == "true" {
log.Log.Info("ConfigureMQTT: not starting as running in Offline mode.")
} else {
@@ -78,7 +106,6 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
webrtc.CandidateArrays = make(map[string](chan string))
opts.OnConnect = func(c mqtt.Client) {
// We managed to connect to the MQTT broker, hurray!
log.Log.Info("ConfigureMQTT: " + mqttClientID + " connected to " + mqttURL)
@@ -117,11 +144,15 @@ func MQTTListenerHandleLiveSD(mqttClient mqtt.Client, hubKey string, configurati
config := configuration.Config
topicRequest := "kerberos/" + hubKey + "/device/" + config.Key + "/request-live"
mqttClient.Subscribe(topicRequest, 0, func(c mqtt.Client, msg mqtt.Message) {
select {
case communication.HandleLiveSD <- time.Now().Unix():
default:
if communication.CameraConnected {
select {
case communication.HandleLiveSD <- time.Now().Unix():
default:
}
log.Log.Info("MQTTListenerHandleLiveSD: received request to livestream.")
} else {
log.Log.Info("MQTTListenerHandleLiveSD: received request to livestream, but camera is not connected.")
}
log.Log.Info("MQTTListenerHandleLiveSD: received request to livestream.")
msg.Ack()
})
}
@@ -130,12 +161,16 @@ func MQTTListenerHandleLiveHDHandshake(mqttClient mqtt.Client, hubKey string, co
config := configuration.Config
topicRequestWebRtc := config.Key + "/register"
mqttClient.Subscribe(topicRequestWebRtc, 0, func(c mqtt.Client, msg mqtt.Message) {
log.Log.Info("MQTTListenerHandleLiveHDHandshake: received request to setup webrtc.")
var sdp models.SDPPayload
json.Unmarshal(msg.Payload(), &sdp)
select {
case communication.HandleLiveHDHandshake <- sdp:
default:
if communication.CameraConnected {
var sdp models.SDPPayload
json.Unmarshal(msg.Payload(), &sdp)
select {
case communication.HandleLiveHDHandshake <- sdp:
default:
}
log.Log.Info("MQTTListenerHandleLiveHDHandshake: received request to setup webrtc.")
} else {
log.Log.Info("MQTTListenerHandleLiveHDHandshake: received request to setup webrtc, but camera is not connected.")
}
msg.Ack()
})
@@ -145,9 +180,13 @@ func MQTTListenerHandleLiveHDKeepalive(mqttClient mqtt.Client, hubKey string, co
config := configuration.Config
topicKeepAlive := fmt.Sprintf("kerberos/webrtc/keepalivehub/%s", config.Key)
mqttClient.Subscribe(topicKeepAlive, 0, func(c mqtt.Client, msg mqtt.Message) {
alive := string(msg.Payload())
communication.HandleLiveHDKeepalive <- alive
log.Log.Info("MQTTListenerHandleLiveHDKeepalive: Received keepalive: " + alive)
if communication.CameraConnected {
alive := string(msg.Payload())
communication.HandleLiveHDKeepalive <- alive
log.Log.Info("MQTTListenerHandleLiveHDKeepalive: Received keepalive: " + alive)
} else {
log.Log.Info("MQTTListenerHandleLiveHDKeepalive: received keepalive, but camera is not connected.")
}
})
}
@@ -155,9 +194,13 @@ func MQTTListenerHandleLiveHDPeers(mqttClient mqtt.Client, hubKey string, config
config := configuration.Config
topicPeers := fmt.Sprintf("kerberos/webrtc/peers/%s", config.Key)
mqttClient.Subscribe(topicPeers, 0, func(c mqtt.Client, msg mqtt.Message) {
peerCount := string(msg.Payload())
communication.HandleLiveHDPeers <- peerCount
log.Log.Info("MQTTListenerHandleLiveHDPeers: Number of peers listening: " + peerCount)
if communication.CameraConnected {
peerCount := string(msg.Payload())
communication.HandleLiveHDPeers <- peerCount
log.Log.Info("MQTTListenerHandleLiveHDPeers: Number of peers listening: " + peerCount)
} else {
log.Log.Info("MQTTListenerHandleLiveHDPeers: received peer count, but camera is not connected.")
}
})
}
@@ -165,19 +208,23 @@ func MQTTListenerHandleLiveHDCandidates(mqttClient mqtt.Client, hubKey string, c
config := configuration.Config
topicCandidates := "candidate/cloud"
mqttClient.Subscribe(topicCandidates, 0, func(c mqtt.Client, msg mqtt.Message) {
var candidate models.Candidate
json.Unmarshal(msg.Payload(), &candidate)
if candidate.CloudKey == config.Key {
key := candidate.CloudKey + "/" + candidate.Cuuid
candidatesExists := false
var channel chan string
for !candidatesExists {
webrtc.CandidatesMutex.Lock()
channel, candidatesExists = webrtc.CandidateArrays[key]
webrtc.CandidatesMutex.Unlock()
if communication.CameraConnected {
var candidate models.Candidate
json.Unmarshal(msg.Payload(), &candidate)
if candidate.CloudKey == config.Key {
key := candidate.CloudKey + "/" + candidate.Cuuid
candidatesExists := false
var channel chan string
for !candidatesExists {
webrtc.CandidatesMutex.Lock()
channel, candidatesExists = webrtc.CandidateArrays[key]
webrtc.CandidatesMutex.Unlock()
}
log.Log.Info("MQTTListenerHandleLiveHDCandidates: " + string(msg.Payload()))
channel <- string(msg.Payload())
}
log.Log.Info("MQTTListenerHandleLiveHDCandidates: " + string(msg.Payload()))
channel <- string(msg.Payload())
} else {
log.Log.Info("MQTTListenerHandleLiveHDCandidates: received candidate, but camera is not connected.")
}
})
}
@@ -186,23 +233,28 @@ func MQTTListenerHandleONVIF(mqttClient mqtt.Client, hubKey string, configuratio
config := configuration.Config
topicOnvif := fmt.Sprintf("kerberos/onvif/%s", config.Key)
mqttClient.Subscribe(topicOnvif, 0, func(c mqtt.Client, msg mqtt.Message) {
var onvifAction models.OnvifAction
json.Unmarshal(msg.Payload(), &onvifAction)
communication.HandleONVIF <- onvifAction
log.Log.Info("MQTTListenerHandleONVIF: Received an action - " + onvifAction.Action)
if communication.CameraConnected {
var onvifAction models.OnvifAction
json.Unmarshal(msg.Payload(), &onvifAction)
communication.HandleONVIF <- onvifAction
log.Log.Info("MQTTListenerHandleONVIF: Received an action - " + onvifAction.Action)
} else {
log.Log.Info("MQTTListenerHandleONVIF: received action, but camera is not connected.")
}
})
}
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
if mqttClient != nil {
// Cleanup all subscriptions.
mqttClient.Unsubscribe("kerberos/" + config.HubKey + "/device/" + config.Key + "/request-live")
mqttClient.Unsubscribe(config.Key + "/register")
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + config.Key)
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + config.Key)
// Cleanup all subscriptions
mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live")
mqttClient.Unsubscribe(PREV_AgentKey + "/register")
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey)
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey)
mqttClient.Unsubscribe("candidate/cloud")
mqttClient.Unsubscribe("kerberos/onvif/" + config.Key)
mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey)
mqttClient.Disconnect(1000)
mqttClient = nil
log.Log.Info("DisconnectMQTT: MQTT client disconnected.")
}
}

View File

@@ -224,13 +224,15 @@ class Settings extends React.Component {
calculateTimetable(timetable) {
this.timetable = timetable;
for (let i = 0; i < timetable.length; i += 1) {
const time = timetable[i];
const { start1, start2, end1, end2 } = time;
this.timetable[i].start1Full = this.convertSecondsToHourMinute(start1);
this.timetable[i].start2Full = this.convertSecondsToHourMinute(start2);
this.timetable[i].end1Full = this.convertSecondsToHourMinute(end1);
this.timetable[i].end2Full = this.convertSecondsToHourMinute(end2);
if (this.timetable) {
for (let i = 0; i < timetable.length; i += 1) {
const time = timetable[i];
const { start1, start2, end1, end2 } = time;
this.timetable[i].start1Full = this.convertSecondsToHourMinute(start1);
this.timetable[i].start2Full = this.convertSecondsToHourMinute(start2);
this.timetable[i].end1Full = this.convertSecondsToHourMinute(end1);
this.timetable[i].end2Full = this.convertSecondsToHourMinute(end2);
}
}
}