mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-06 21:51:07 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9cf9babd73 | ||
|
|
229c246e1c | ||
|
|
15d9bcda4f | ||
|
|
068063695e | ||
|
|
b1722844f3 |
@@ -2,6 +2,9 @@ module github.com/kerberos-io/agent/machinery
|
||||
|
||||
go 1.19
|
||||
|
||||
// replace github.com/kerberos-io/joy4 v1.0.57 => ../../../../github.com/kerberos-io/joy4
|
||||
// replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif
|
||||
|
||||
require (
|
||||
github.com/InVisionApp/conjungo v1.1.0
|
||||
github.com/appleboy/gin-jwt/v2 v2.9.1
|
||||
@@ -21,7 +24,7 @@ require (
|
||||
github.com/golang-module/carbon/v2 v2.2.3
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kellydunn/golang-geo v0.7.0
|
||||
github.com/kerberos-io/joy4 v1.0.57
|
||||
github.com/kerberos-io/joy4 v1.0.58
|
||||
github.com/kerberos-io/onvif v0.0.5
|
||||
github.com/minio/minio-go/v6 v6.0.57
|
||||
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
|
||||
|
||||
@@ -264,8 +264,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
|
||||
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
|
||||
github.com/kerberos-io/joy4 v1.0.57 h1:/8epNAJv4cOzBG8pFiM9hVNXfwsgA+8/2nHQ2yOeyII=
|
||||
github.com/kerberos-io/joy4 v1.0.57/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/joy4 v1.0.58 h1:R8EECSF+bG7o2yHC6cX/lF77Z+bDVGl6OioLZ3+5MN4=
|
||||
github.com/kerberos-io/joy4 v1.0.58/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
|
||||
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@@ -16,7 +17,6 @@ import (
|
||||
var VERSION = "3.0.0"
|
||||
|
||||
func main() {
|
||||
|
||||
// You might be interested in debugging the agent.
|
||||
if os.Getenv("DATADOG_AGENT_ENABLED") == "true" {
|
||||
if os.Getenv("DATADOG_AGENT_K8S_ENABLED") == "true" {
|
||||
@@ -111,8 +111,14 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
// Create a cancelable context, which will be used to cancel and restart.
|
||||
// This is used to restart the agent when the configuration is updated.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Bootstrapping the agent
|
||||
communication := models.Communication{
|
||||
Context: &ctx,
|
||||
CancelContext: &cancel,
|
||||
HandleBootstrap: make(chan string, 1),
|
||||
}
|
||||
go components.Bootstrap(&configuration, &communication)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package capture
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -15,9 +16,9 @@ import (
|
||||
"github.com/kerberos-io/joy4/format"
|
||||
)
|
||||
|
||||
func OpenRTSP(url string) (av.DemuxCloser, []av.CodecData, error) {
|
||||
func OpenRTSP(ctx context.Context, url string) (av.DemuxCloser, []av.CodecData, error) {
|
||||
format.RegisterAll()
|
||||
infile, err := avutil.Open(url)
|
||||
infile, err := avutil.Open(ctx, url)
|
||||
if err == nil {
|
||||
streams, errstreams := infile.Streams()
|
||||
return infile, streams, errstreams
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
package capture
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
@@ -431,6 +432,10 @@ func VerifyCamera(c *gin.Context) {
|
||||
var cameraStreams models.CameraStreams
|
||||
err := c.BindJSON(&cameraStreams)
|
||||
|
||||
// Should return in 5 seconds.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err == nil {
|
||||
|
||||
streamType := c.Param("streamType")
|
||||
@@ -442,7 +447,7 @@ func VerifyCamera(c *gin.Context) {
|
||||
if streamType == "secondary" {
|
||||
rtspUrl = cameraStreams.SubRTSP
|
||||
}
|
||||
_, codecs, err := OpenRTSP(rtspUrl)
|
||||
_, codecs, err := OpenRTSP(ctx, rtspUrl)
|
||||
if err == nil {
|
||||
|
||||
videoIdx := -1
|
||||
|
||||
@@ -83,17 +83,25 @@ func OpenConfig(configuration *models.Configuration) {
|
||||
db := client.Database(database.DatabaseName)
|
||||
collection := db.Collection("configuration")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
collection.FindOne(ctx, bson.M{
|
||||
var globalConfig models.Config
|
||||
err := collection.FindOne(context.Background(), bson.M{
|
||||
"type": "global",
|
||||
}).Decode(&configuration.GlobalConfig)
|
||||
}).Decode(&globalConfig)
|
||||
if err != nil {
|
||||
log.Log.Error("Could not find global configuration, using default configuration.")
|
||||
}
|
||||
configuration.GlobalConfig = globalConfig
|
||||
|
||||
collection.FindOne(ctx, bson.M{
|
||||
var customConfig models.Config
|
||||
deploymentName := os.Getenv("DEPLOYMENT_NAME")
|
||||
err = collection.FindOne(context.Background(), bson.M{
|
||||
"type": "config",
|
||||
"name": os.Getenv("DEPLOYMENT_NAME"),
|
||||
}).Decode(&configuration.CustomConfig)
|
||||
"name": deploymentName,
|
||||
}).Decode(&customConfig)
|
||||
if err != nil {
|
||||
log.Log.Error("Could not find configuration for " + deploymentName + ", using global configuration.")
|
||||
}
|
||||
configuration.CustomConfig = customConfig
|
||||
|
||||
// We will merge both configs in a single config file.
|
||||
// Read again from database but this store overwrite the same object.
|
||||
@@ -209,8 +217,7 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
|
||||
|
||||
/* ONVIF connnection settings */
|
||||
case "AGENT_CAPTURE_IPCAMERA_ONVIF":
|
||||
isEnabled := value == " true"
|
||||
configuration.Config.Capture.IPCamera.ONVIF = isEnabled
|
||||
configuration.Config.Capture.IPCamera.ONVIF = value
|
||||
break
|
||||
case "AGENT_CAPTURE_IPCAMERA_ONVIF_XADDR":
|
||||
configuration.Config.Capture.IPCamera.ONVIFXAddr = value
|
||||
@@ -440,9 +447,11 @@ func SaveConfig(config models.Config, configuration *models.Configuration, commu
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case communication.HandleBootstrap <- "restart":
|
||||
default:
|
||||
if communication.CameraConnected {
|
||||
select {
|
||||
case communication.HandleBootstrap <- "restart":
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
communication.IsConfiguring.UnSet()
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package components
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/kerberos-io/joy4/cgo/ffmpeg"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/capture"
|
||||
@@ -53,6 +55,7 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
|
||||
communication.HandleLiveSD = make(chan int64, 1)
|
||||
communication.HandleLiveHDKeepalive = make(chan string, 1)
|
||||
communication.HandleLiveHDPeers = make(chan string, 1)
|
||||
communication.HandleONVIF = make(chan models.OnvifAction, 1)
|
||||
communication.IsConfiguring = abool.New()
|
||||
|
||||
// Before starting the agent, we have a control goroutine, that might
|
||||
@@ -67,33 +70,71 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
|
||||
// Handle heartbeats
|
||||
go cloud.HandleHeartBeat(configuration, communication, uptimeStart)
|
||||
|
||||
// We'll create a MQTT handler, which will be used to communicate with Kerberos Hub.
|
||||
// Configure a MQTT client which helps for a bi-directional communication
|
||||
mqttClient := routers.ConfigureMQTT(configuration, communication)
|
||||
|
||||
// Run the agent and fire up all the other
|
||||
// goroutines which do image capture, motion detection, onvif, etc.
|
||||
|
||||
for {
|
||||
|
||||
// This will blocking until receiving a signal to be restarted, reconfigured, stopped, etc.
|
||||
status := RunAgent(configuration, communication, uptimeStart, cameraSettings, decoder, subDecoder)
|
||||
status := RunAgent(configuration, communication, mqttClient, uptimeStart, cameraSettings, decoder, subDecoder)
|
||||
|
||||
if status == "stop" {
|
||||
break
|
||||
}
|
||||
|
||||
// We will reconfigure or restart the agent, we will mark the agent as not connected.
|
||||
communication.CameraConnected = false
|
||||
|
||||
// We will re open the configuration, might have changed :O!
|
||||
OpenConfig(configuration)
|
||||
|
||||
// We will override the configuration with the environment variables
|
||||
OverrideWithEnvironmentVariables(configuration)
|
||||
|
||||
// Reset the MQTT client, might have provided new information, so we need to reconnect.
|
||||
if routers.HasMQTTClientModified(configuration) {
|
||||
routers.DisconnectMQTT(mqttClient, &configuration.Config)
|
||||
mqttClient = routers.ConfigureMQTT(configuration, communication)
|
||||
}
|
||||
|
||||
// We will create a new cancelable context, which will be used to cancel and restart.
|
||||
// This is used to restart the agent when the configuration is updated.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
communication.Context = &ctx
|
||||
communication.CancelContext = &cancel
|
||||
}
|
||||
log.Log.Debug("Bootstrap: finished")
|
||||
}
|
||||
|
||||
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time, cameraSettings *models.Camera, decoder *ffmpeg.VideoDecoder, subDecoder *ffmpeg.VideoDecoder) string {
|
||||
func RunAgent(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, uptimeStart time.Time, cameraSettings *models.Camera, decoder *ffmpeg.VideoDecoder, subDecoder *ffmpeg.VideoDecoder) string {
|
||||
|
||||
log.Log.Debug("RunAgent: bootstrapping agent")
|
||||
config := configuration.Config
|
||||
|
||||
status := "not started"
|
||||
|
||||
// Currently only support H264 encoded cameras, this will change.
|
||||
// Establishing the camera connection
|
||||
rtspUrl := config.Capture.IPCamera.RTSP
|
||||
infile, streams, err := capture.OpenRTSP(rtspUrl)
|
||||
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl)
|
||||
|
||||
// We will initialise the camera settings object
|
||||
// so we can check if the camera settings have changed, and we need
|
||||
// to reload the decoders.
|
||||
|
||||
videoStream, _ := capture.GetVideoStream(streams)
|
||||
if videoStream == nil {
|
||||
log.Log.Error("RunAgent: no video stream found, might be the wrong codec (we only support H264 for the moment)")
|
||||
time.Sleep(time.Second * 3)
|
||||
return status
|
||||
}
|
||||
|
||||
num, denum := videoStream.(av.VideoCodecData).Framerate()
|
||||
width := videoStream.(av.VideoCodecData).Width()
|
||||
height := videoStream.(av.VideoCodecData).Height()
|
||||
|
||||
var queue *pubsub.Queue
|
||||
var subQueue *pubsub.Queue
|
||||
@@ -101,11 +142,9 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
var decoderMutex sync.Mutex
|
||||
var subDecoderMutex sync.Mutex
|
||||
|
||||
status := "not started"
|
||||
|
||||
if err == nil {
|
||||
|
||||
log.Log.Info("RunAgent: opened RTSP stream" + rtspUrl)
|
||||
log.Log.Info("RunAgent: opened RTSP stream: " + rtspUrl)
|
||||
|
||||
// We might have a secondary rtsp url, so we might need to use that.
|
||||
var subInfile av.DemuxCloser
|
||||
@@ -113,23 +152,21 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
subStreamEnabled := false
|
||||
subRtspUrl := config.Capture.IPCamera.SubRTSP
|
||||
if subRtspUrl != "" && subRtspUrl != rtspUrl {
|
||||
subInfile, subStreams, err = capture.OpenRTSP(subRtspUrl)
|
||||
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl)
|
||||
if err == nil {
|
||||
log.Log.Info("RunAgent: opened RTSP sub stream " + subRtspUrl)
|
||||
subStreamEnabled = true
|
||||
}
|
||||
|
||||
videoStream, _ := capture.GetVideoStream(subStreams)
|
||||
if videoStream == nil {
|
||||
log.Log.Error("RunAgent: no video substream found, might be the wrong codec (we only support H264 for the moment)")
|
||||
time.Sleep(time.Second * 3)
|
||||
return status
|
||||
}
|
||||
}
|
||||
|
||||
// We will initialise the camera settings object
|
||||
// so we can check if the camera settings have changed, and we need
|
||||
// to reload the decoders.
|
||||
videoStream, _ := capture.GetVideoStream(streams)
|
||||
num, denum := videoStream.(av.VideoCodecData).Framerate()
|
||||
width := videoStream.(av.VideoCodecData).Width()
|
||||
height := videoStream.(av.VideoCodecData).Height()
|
||||
|
||||
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
|
||||
|
||||
if cameraSettings.Initialized {
|
||||
decoder.Close()
|
||||
if subStreamEnabled {
|
||||
@@ -187,9 +224,8 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
subQueue.WriteHeader(subStreams)
|
||||
}
|
||||
|
||||
// Configure a MQTT client which helps for a bi-directional communication
|
||||
communication.HandleONVIF = make(chan models.OnvifAction, 1)
|
||||
mqttClient := routers.ConfigureMQTT(configuration, communication)
|
||||
// If we reach this point, we have a working RTSP connection.
|
||||
communication.CameraConnected = true
|
||||
|
||||
// Handle the camera stream
|
||||
go capture.HandleStream(infile, queue, communication)
|
||||
@@ -240,8 +276,12 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
// This will go into a blocking state, once this channel is triggered
|
||||
// the agent will cleanup and restart.
|
||||
|
||||
status = <-communication.HandleBootstrap
|
||||
|
||||
// Cancel the main context, this will stop all the other goroutines.
|
||||
(*communication.CancelContext)()
|
||||
|
||||
// Here we are cleaning up everything!
|
||||
if configuration.Config.Offline != "true" {
|
||||
communication.HandleUpload <- "stop"
|
||||
@@ -265,16 +305,9 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
subQueue = nil
|
||||
communication.SubQueue = nil
|
||||
}
|
||||
close(communication.HandleONVIF)
|
||||
communication.HandleONVIF = nil
|
||||
close(communication.HandleLiveHDHandshake)
|
||||
communication.HandleLiveHDHandshake = nil
|
||||
close(communication.HandleMotion)
|
||||
communication.HandleMotion = nil
|
||||
|
||||
// Disconnect MQTT
|
||||
routers.DisconnectMQTT(mqttClient, &configuration.Config)
|
||||
|
||||
// Wait a few seconds to stop the decoder.
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
@@ -303,29 +336,32 @@ func ControlAgent(communication *models.Communication) {
|
||||
var previousPacket int64 = 0
|
||||
var occurence = 0
|
||||
for {
|
||||
packetsR := packageCounter.Load().(int64)
|
||||
if packetsR == previousPacket {
|
||||
// If we are already reconfiguring,
|
||||
// we dont need to check if the stream is blocking.
|
||||
if !communication.IsConfiguring.IsSet() {
|
||||
occurence = occurence + 1
|
||||
|
||||
// If camera is connected, we'll check if we are still receiving packets.
|
||||
if communication.CameraConnected {
|
||||
packetsR := packageCounter.Load().(int64)
|
||||
if packetsR == previousPacket {
|
||||
// If we are already reconfiguring,
|
||||
// we dont need to check if the stream is blocking.
|
||||
if !communication.IsConfiguring.IsSet() {
|
||||
occurence = occurence + 1
|
||||
}
|
||||
} else {
|
||||
occurence = 0
|
||||
}
|
||||
} else {
|
||||
|
||||
occurence = 0
|
||||
log.Log.Info("ControlAgent: Number of packets read " + strconv.FormatInt(packetsR, 10))
|
||||
|
||||
// After 15 seconds without activity this is thrown..
|
||||
if occurence == 3 {
|
||||
log.Log.Info("Main: Restarting machinery.")
|
||||
communication.HandleBootstrap <- "restart"
|
||||
time.Sleep(2 * time.Second)
|
||||
occurence = 0
|
||||
}
|
||||
previousPacket = packageCounter.Load().(int64)
|
||||
}
|
||||
|
||||
log.Log.Info("ControlAgent: Number of packets read " + strconv.FormatInt(packetsR, 10))
|
||||
|
||||
// After 15 seconds without activity this is thrown..
|
||||
if occurence == 3 {
|
||||
log.Log.Info("Main: Restarting machinery.")
|
||||
communication.HandleBootstrap <- "restart"
|
||||
time.Sleep(2 * time.Second)
|
||||
occurence = 0
|
||||
}
|
||||
previousPacket = packageCounter.Load().(int64)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -10,15 +10,12 @@ import (
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
geo "github.com/kellydunn/golang-geo"
|
||||
"github.com/kerberos-io/agent/machinery/src/capture"
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
"github.com/kerberos-io/joy4/av/pubsub"
|
||||
|
||||
//"github.com/whorfin/go-libjpeg/jpeg"
|
||||
|
||||
geo "github.com/kellydunn/golang-geo"
|
||||
"github.com/kerberos-io/joy4/av"
|
||||
"github.com/kerberos-io/joy4/av/pubsub"
|
||||
"github.com/kerberos-io/joy4/cgo/ffmpeg"
|
||||
)
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
@@ -12,6 +13,8 @@ import (
|
||||
// The communication struct that is managing
|
||||
// all the communication between the different goroutines.
|
||||
type Communication struct {
|
||||
Context *context.Context
|
||||
CancelContext *context.CancelFunc
|
||||
PackageCounter *atomic.Value
|
||||
LastPacketTimer *atomic.Value
|
||||
CloudTimestamp *atomic.Value
|
||||
|
||||
@@ -73,7 +73,7 @@ type IPCamera struct {
|
||||
RTSP string `json:"rtsp"`
|
||||
SubRTSP string `json:"sub_rtsp"`
|
||||
FPS string `json:"fps"`
|
||||
ONVIF bool `json:"onvif,omitempty" bson:"onvif"`
|
||||
ONVIF string `json:"onvif,omitempty" bson:"onvif"`
|
||||
ONVIFXAddr string `json:"onvif_xaddr,omitempty" bson:"onvif_xaddr"`
|
||||
ONVIFUsername string `json:"onvif_username,omitempty" bson:"onvif_username"`
|
||||
ONVIFPassword string `json:"onvif_password,omitempty" bson:"onvif_password"`
|
||||
|
||||
@@ -13,10 +13,38 @@ import (
|
||||
"github.com/kerberos-io/agent/machinery/src/webrtc"
|
||||
)
|
||||
|
||||
// We'll cache the MQTT settings to know if we need to reinitialize the MQTT client connection.
|
||||
// If we update the configuration but no new MQTT settings are provided, we don't need to restart it.
|
||||
var PREV_MQTTURI string
|
||||
var PREV_MQTTUsername string
|
||||
var PREV_MQTTPassword string
|
||||
var PREV_HubKey string
|
||||
var PREV_AgentKey string
|
||||
|
||||
func HasMQTTClientModified(configuration *models.Configuration) bool {
|
||||
MTTURI := configuration.Config.MQTTURI
|
||||
MTTUsername := configuration.Config.MQTTUsername
|
||||
MQTTPassword := configuration.Config.MQTTPassword
|
||||
HubKey := configuration.Config.HubKey
|
||||
AgentKey := configuration.Config.Key
|
||||
if PREV_MQTTURI != MTTURI || PREV_MQTTUsername != MTTUsername || PREV_MQTTPassword != MQTTPassword || PREV_HubKey != HubKey || PREV_AgentKey != AgentKey {
|
||||
log.Log.Info("HasMQTTClientModified: MQTT settings have been modified, restarting MQTT client.")
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func ConfigureMQTT(configuration *models.Configuration, communication *models.Communication) mqtt.Client {
|
||||
|
||||
config := configuration.Config
|
||||
|
||||
// Set the MQTT settings.
|
||||
PREV_MQTTURI = configuration.Config.MQTTURI
|
||||
PREV_MQTTUsername = configuration.Config.MQTTUsername
|
||||
PREV_MQTTPassword = configuration.Config.MQTTPassword
|
||||
PREV_HubKey = configuration.Config.HubKey
|
||||
PREV_AgentKey = configuration.Config.Key
|
||||
|
||||
if config.Offline == "true" {
|
||||
log.Log.Info("ConfigureMQTT: not starting as running in Offline mode.")
|
||||
} else {
|
||||
@@ -78,7 +106,6 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
|
||||
webrtc.CandidateArrays = make(map[string](chan string))
|
||||
|
||||
opts.OnConnect = func(c mqtt.Client) {
|
||||
|
||||
// We managed to connect to the MQTT broker, hurray!
|
||||
log.Log.Info("ConfigureMQTT: " + mqttClientID + " connected to " + mqttURL)
|
||||
|
||||
@@ -117,11 +144,15 @@ func MQTTListenerHandleLiveSD(mqttClient mqtt.Client, hubKey string, configurati
|
||||
config := configuration.Config
|
||||
topicRequest := "kerberos/" + hubKey + "/device/" + config.Key + "/request-live"
|
||||
mqttClient.Subscribe(topicRequest, 0, func(c mqtt.Client, msg mqtt.Message) {
|
||||
select {
|
||||
case communication.HandleLiveSD <- time.Now().Unix():
|
||||
default:
|
||||
if communication.CameraConnected {
|
||||
select {
|
||||
case communication.HandleLiveSD <- time.Now().Unix():
|
||||
default:
|
||||
}
|
||||
log.Log.Info("MQTTListenerHandleLiveSD: received request to livestream.")
|
||||
} else {
|
||||
log.Log.Info("MQTTListenerHandleLiveSD: received request to livestream, but camera is not connected.")
|
||||
}
|
||||
log.Log.Info("MQTTListenerHandleLiveSD: received request to livestream.")
|
||||
msg.Ack()
|
||||
})
|
||||
}
|
||||
@@ -130,12 +161,16 @@ func MQTTListenerHandleLiveHDHandshake(mqttClient mqtt.Client, hubKey string, co
|
||||
config := configuration.Config
|
||||
topicRequestWebRtc := config.Key + "/register"
|
||||
mqttClient.Subscribe(topicRequestWebRtc, 0, func(c mqtt.Client, msg mqtt.Message) {
|
||||
log.Log.Info("MQTTListenerHandleLiveHDHandshake: received request to setup webrtc.")
|
||||
var sdp models.SDPPayload
|
||||
json.Unmarshal(msg.Payload(), &sdp)
|
||||
select {
|
||||
case communication.HandleLiveHDHandshake <- sdp:
|
||||
default:
|
||||
if communication.CameraConnected {
|
||||
var sdp models.SDPPayload
|
||||
json.Unmarshal(msg.Payload(), &sdp)
|
||||
select {
|
||||
case communication.HandleLiveHDHandshake <- sdp:
|
||||
default:
|
||||
}
|
||||
log.Log.Info("MQTTListenerHandleLiveHDHandshake: received request to setup webrtc.")
|
||||
} else {
|
||||
log.Log.Info("MQTTListenerHandleLiveHDHandshake: received request to setup webrtc, but camera is not connected.")
|
||||
}
|
||||
msg.Ack()
|
||||
})
|
||||
@@ -145,9 +180,13 @@ func MQTTListenerHandleLiveHDKeepalive(mqttClient mqtt.Client, hubKey string, co
|
||||
config := configuration.Config
|
||||
topicKeepAlive := fmt.Sprintf("kerberos/webrtc/keepalivehub/%s", config.Key)
|
||||
mqttClient.Subscribe(topicKeepAlive, 0, func(c mqtt.Client, msg mqtt.Message) {
|
||||
alive := string(msg.Payload())
|
||||
communication.HandleLiveHDKeepalive <- alive
|
||||
log.Log.Info("MQTTListenerHandleLiveHDKeepalive: Received keepalive: " + alive)
|
||||
if communication.CameraConnected {
|
||||
alive := string(msg.Payload())
|
||||
communication.HandleLiveHDKeepalive <- alive
|
||||
log.Log.Info("MQTTListenerHandleLiveHDKeepalive: Received keepalive: " + alive)
|
||||
} else {
|
||||
log.Log.Info("MQTTListenerHandleLiveHDKeepalive: received keepalive, but camera is not connected.")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -155,9 +194,13 @@ func MQTTListenerHandleLiveHDPeers(mqttClient mqtt.Client, hubKey string, config
|
||||
config := configuration.Config
|
||||
topicPeers := fmt.Sprintf("kerberos/webrtc/peers/%s", config.Key)
|
||||
mqttClient.Subscribe(topicPeers, 0, func(c mqtt.Client, msg mqtt.Message) {
|
||||
peerCount := string(msg.Payload())
|
||||
communication.HandleLiveHDPeers <- peerCount
|
||||
log.Log.Info("MQTTListenerHandleLiveHDPeers: Number of peers listening: " + peerCount)
|
||||
if communication.CameraConnected {
|
||||
peerCount := string(msg.Payload())
|
||||
communication.HandleLiveHDPeers <- peerCount
|
||||
log.Log.Info("MQTTListenerHandleLiveHDPeers: Number of peers listening: " + peerCount)
|
||||
} else {
|
||||
log.Log.Info("MQTTListenerHandleLiveHDPeers: received peer count, but camera is not connected.")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -165,19 +208,23 @@ func MQTTListenerHandleLiveHDCandidates(mqttClient mqtt.Client, hubKey string, c
|
||||
config := configuration.Config
|
||||
topicCandidates := "candidate/cloud"
|
||||
mqttClient.Subscribe(topicCandidates, 0, func(c mqtt.Client, msg mqtt.Message) {
|
||||
var candidate models.Candidate
|
||||
json.Unmarshal(msg.Payload(), &candidate)
|
||||
if candidate.CloudKey == config.Key {
|
||||
key := candidate.CloudKey + "/" + candidate.Cuuid
|
||||
candidatesExists := false
|
||||
var channel chan string
|
||||
for !candidatesExists {
|
||||
webrtc.CandidatesMutex.Lock()
|
||||
channel, candidatesExists = webrtc.CandidateArrays[key]
|
||||
webrtc.CandidatesMutex.Unlock()
|
||||
if communication.CameraConnected {
|
||||
var candidate models.Candidate
|
||||
json.Unmarshal(msg.Payload(), &candidate)
|
||||
if candidate.CloudKey == config.Key {
|
||||
key := candidate.CloudKey + "/" + candidate.Cuuid
|
||||
candidatesExists := false
|
||||
var channel chan string
|
||||
for !candidatesExists {
|
||||
webrtc.CandidatesMutex.Lock()
|
||||
channel, candidatesExists = webrtc.CandidateArrays[key]
|
||||
webrtc.CandidatesMutex.Unlock()
|
||||
}
|
||||
log.Log.Info("MQTTListenerHandleLiveHDCandidates: " + string(msg.Payload()))
|
||||
channel <- string(msg.Payload())
|
||||
}
|
||||
log.Log.Info("MQTTListenerHandleLiveHDCandidates: " + string(msg.Payload()))
|
||||
channel <- string(msg.Payload())
|
||||
} else {
|
||||
log.Log.Info("MQTTListenerHandleLiveHDCandidates: received candidate, but camera is not connected.")
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -186,23 +233,28 @@ func MQTTListenerHandleONVIF(mqttClient mqtt.Client, hubKey string, configuratio
|
||||
config := configuration.Config
|
||||
topicOnvif := fmt.Sprintf("kerberos/onvif/%s", config.Key)
|
||||
mqttClient.Subscribe(topicOnvif, 0, func(c mqtt.Client, msg mqtt.Message) {
|
||||
var onvifAction models.OnvifAction
|
||||
json.Unmarshal(msg.Payload(), &onvifAction)
|
||||
communication.HandleONVIF <- onvifAction
|
||||
log.Log.Info("MQTTListenerHandleONVIF: Received an action - " + onvifAction.Action)
|
||||
if communication.CameraConnected {
|
||||
var onvifAction models.OnvifAction
|
||||
json.Unmarshal(msg.Payload(), &onvifAction)
|
||||
communication.HandleONVIF <- onvifAction
|
||||
log.Log.Info("MQTTListenerHandleONVIF: Received an action - " + onvifAction.Action)
|
||||
} else {
|
||||
log.Log.Info("MQTTListenerHandleONVIF: received action, but camera is not connected.")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
|
||||
if mqttClient != nil {
|
||||
// Cleanup all subscriptions.
|
||||
mqttClient.Unsubscribe("kerberos/" + config.HubKey + "/device/" + config.Key + "/request-live")
|
||||
mqttClient.Unsubscribe(config.Key + "/register")
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + config.Key)
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + config.Key)
|
||||
// Cleanup all subscriptions
|
||||
mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live")
|
||||
mqttClient.Unsubscribe(PREV_AgentKey + "/register")
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("candidate/cloud")
|
||||
mqttClient.Unsubscribe("kerberos/onvif/" + config.Key)
|
||||
mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey)
|
||||
mqttClient.Disconnect(1000)
|
||||
mqttClient = nil
|
||||
log.Log.Info("DisconnectMQTT: MQTT client disconnected.")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,13 +224,15 @@ class Settings extends React.Component {
|
||||
|
||||
calculateTimetable(timetable) {
|
||||
this.timetable = timetable;
|
||||
for (let i = 0; i < timetable.length; i += 1) {
|
||||
const time = timetable[i];
|
||||
const { start1, start2, end1, end2 } = time;
|
||||
this.timetable[i].start1Full = this.convertSecondsToHourMinute(start1);
|
||||
this.timetable[i].start2Full = this.convertSecondsToHourMinute(start2);
|
||||
this.timetable[i].end1Full = this.convertSecondsToHourMinute(end1);
|
||||
this.timetable[i].end2Full = this.convertSecondsToHourMinute(end2);
|
||||
if (this.timetable) {
|
||||
for (let i = 0; i < timetable.length; i += 1) {
|
||||
const time = timetable[i];
|
||||
const { start1, start2, end1, end2 } = time;
|
||||
this.timetable[i].start1Full = this.convertSecondsToHourMinute(start1);
|
||||
this.timetable[i].start2Full = this.convertSecondsToHourMinute(start2);
|
||||
this.timetable[i].end1Full = this.convertSecondsToHourMinute(end1);
|
||||
this.timetable[i].end2Full = this.convertSecondsToHourMinute(end2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user