Compare commits

...

5 Commits

Author SHA1 Message Date
Cedric Verstraeten
47f4c19617 Update Config.go 2023-09-13 08:14:25 +02:00
Cedric Verstraeten
280a81809a Update Config.go 2023-09-12 22:38:26 +02:00
Cedric Verstraeten
59358acb30 add logging + empty friendly name 2023-09-12 15:17:56 +02:00
Cedric Verstraeten
ebd655ac73 Allow remote configuration through MQTT + restructure config method 2023-09-12 10:50:36 +02:00
Cedric Verstraeten
6325e37aae empty presets caused hub connection failing 2023-09-07 08:16:46 +02:00
7 changed files with 165 additions and 25 deletions

View File

@@ -9,6 +9,8 @@ import (
"github.com/kerberos-io/agent/machinery/src/components"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
configService "github.com/kerberos-io/agent/machinery/src/config"
"github.com/kerberos-io/agent/machinery/src/routers"
"github.com/kerberos-io/agent/machinery/src/utils"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
@@ -92,10 +94,10 @@ func main() {
configuration.Port = port
// Open this configuration either from Kerberos Agent or Kerberos Factory.
components.OpenConfig(configDirectory, &configuration)
configService.OpenConfig(configDirectory, &configuration)
// We will override the configuration with the environment variables
components.OverrideWithEnvironmentVariables(&configuration)
configService.OverrideWithEnvironmentVariables(&configuration)
// Printing final configuration
utils.PrintConfiguration(&configuration)
@@ -113,7 +115,7 @@ func main() {
if configuration.Config.Key == "" {
key := utils.RandStringBytesMaskImpr(30)
configuration.Config.Key = key
err := components.StoreConfig(configDirectory, configuration.Config)
err := configService.StoreConfig(configDirectory, configuration.Config)
if err == nil {
log.Log.Info("Main: updated unique key for agent to: " + key)
} else {

View File

@@ -302,10 +302,27 @@ loop:
onvifPresetsList, err = json.Marshal(presets)
if err != nil {
log.Log.Error("HandleHeartBeat: error while marshalling presets: " + err.Error())
onvifPresetsList = []byte("[]")
}
} else {
if err != nil {
log.Log.Error("HandleHeartBeat: error while getting presets: " + err.Error())
} else {
log.Log.Debug("HandleHeartBeat: no presets found.")
}
onvifPresetsList = []byte("[]")
}
} else {
log.Log.Error("HandleHeartBeat: error while getting PTZ configurations: " + err.Error())
onvifPresetsList = []byte("[]")
}
} else {
log.Log.Error("HandleHeartBeat: error while connecting to ONVIF device: " + err.Error())
onvifPresetsList = []byte("[]")
}
} else {
log.Log.Debug("HandleHeartBeat: ONVIF is not enabled.")
onvifPresetsList = []byte("[]")
}
// Check if the agent is running inside a cluster (Kerberos Factory) or as

View File

@@ -14,6 +14,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/cloud"
"github.com/kerberos-io/agent/machinery/src/computervision"
configService "github.com/kerberos-io/agent/machinery/src/config"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/onvif"
@@ -72,7 +73,7 @@ func Bootstrap(configDirectory string, configuration *models.Configuration, comm
// We'll create a MQTT handler, which will be used to communicate with Kerberos Hub.
// Configure a MQTT client which helps for a bi-directional communication
mqttClient := routers.ConfigureMQTT(configuration, communication)
mqttClient := routers.ConfigureMQTT(configDirectory, configuration, communication)
// Run the agent and fire up all the other
// goroutines which do image capture, motion detection, onvif, etc.
@@ -87,15 +88,15 @@ func Bootstrap(configDirectory string, configuration *models.Configuration, comm
if status == "not started" {
// We will re open the configuration, might have changed :O!
OpenConfig(configDirectory, configuration)
configService.OpenConfig(configDirectory, configuration)
// We will override the configuration with the environment variables
OverrideWithEnvironmentVariables(configuration)
configService.OverrideWithEnvironmentVariables(configuration)
}
// Reset the MQTT client, might have provided new information, so we need to reconnect.
if routers.HasMQTTClientModified(configuration) {
routers.DisconnectMQTT(mqttClient, &configuration.Config)
mqttClient = routers.ConfigureMQTT(configuration, communication)
mqttClient = routers.ConfigureMQTT(configDirectory, configuration, communication)
}
// We will create a new cancelable context, which will be used to cancel and restart.
@@ -134,6 +135,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
width := videoStream.(av.VideoCodecData).Width()
height := videoStream.(av.VideoCodecData).Height()
// Set config values as well
configuration.Config.Capture.IPCamera.Width = width
configuration.Config.Capture.IPCamera.Height = height
var queue *pubsub.Queue
var subQueue *pubsub.Queue
@@ -162,6 +167,13 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
time.Sleep(time.Second * 3)
return status
}
width := videoStream.(av.VideoCodecData).Width()
height := videoStream.(av.VideoCodecData).Height()
// Set config values as well
configuration.Config.Capture.IPCamera.Width = width
configuration.Config.Capture.IPCamera.Height = height
}
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
@@ -190,6 +202,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
cameraSettings.Denum = denum
cameraSettings.Codec = videoStream.(av.VideoCodecData).Type()
cameraSettings.Initialized = true
} else {
log.Log.Info("RunAgent: camera settings did not change, keeping decoder")
}
@@ -285,10 +298,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
(*communication.CancelContext)()
// We will re open the configuration, might have changed :O!
OpenConfig(configDirectory, configuration)
configService.OpenConfig(configDirectory, configuration)
// We will override the configuration with the environment variables
OverrideWithEnvironmentVariables(configuration)
configService.OverrideWithEnvironmentVariables(configuration)
// Here we are cleaning up everything!
if configuration.Config.Offline != "true" {

View File

@@ -1,4 +1,4 @@
package components
package config
import (
"context"
@@ -210,7 +210,7 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
configuration.Config.Key = value
break
case "AGENT_NAME":
configuration.Config.Name = value
configuration.Config.FriendlyName = value
break
case "AGENT_TIMEZONE":
configuration.Config.Timezone = value

View File

@@ -21,7 +21,7 @@ type Config struct {
AutoClean string `json:"auto_clean"`
RemoveAfterUpload string `json:"remove_after_upload"`
MaxDirectorySize int64 `json:"max_directory_size"`
Timezone string `json:"timezone,omitempty" bson:"timezone,omitempty"`
Timezone string `json:"timezone"`
Capture Capture `json:"capture"`
Timetable []*Timetable `json:"timetable"`
Region *Region `json:"region"`
@@ -70,13 +70,15 @@ type Capture struct {
// IPCamera configuration, such as the RTSP url of the IPCamera and the FPS.
// Also includes ONVIF integration
type IPCamera struct {
Width int `json:"width"`
Height int `json:"height"`
FPS string `json:"fps"`
RTSP string `json:"rtsp"`
SubRTSP string `json:"sub_rtsp"`
FPS string `json:"fps"`
ONVIF string `json:"onvif,omitempty" bson:"onvif"`
ONVIFXAddr string `json:"onvif_xaddr,omitempty" bson:"onvif_xaddr"`
ONVIFUsername string `json:"onvif_username,omitempty" bson:"onvif_username"`
ONVIFPassword string `json:"onvif_password,omitempty" bson:"onvif_password"`
ONVIFXAddr string `json:"onvif_xaddr" bson:"onvif_xaddr"`
ONVIFUsername string `json:"onvif_username" bson:"onvif_username"`
ONVIFPassword string `json:"onvif_password" bson:"onvif_password"`
}
// USBCamera configuration, such as the device path (/dev/video*)

View File

@@ -12,6 +12,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/cloud"
"github.com/kerberos-io/agent/machinery/src/components"
configService "github.com/kerberos-io/agent/machinery/src/config"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/utils"
@@ -40,7 +41,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
var config models.Config
err := c.BindJSON(&config)
if err == nil {
err := components.SaveConfig(configDirectory, config, configuration, communication)
err := configService.SaveConfig(configDirectory, config, configuration, communication)
if err == nil {
c.JSON(200, gin.H{
"data": "☄ Reconfiguring",
@@ -165,7 +166,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
var config models.Config
err := c.BindJSON(&config)
if err == nil {
err := components.SaveConfig(configDirectory, config, configuration, communication)
err := configService.SaveConfig(configDirectory, config, configuration, communication)
if err == nil {
c.JSON(200, gin.H{
"data": "☄ Reconfiguring",
@@ -215,7 +216,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
// We will only send an image once per second.
time.Sleep(time.Second * 1)
log.Log.Info("AddRoutes (/stream): reading from MJPEG stream")
img, err := components.GetImageFromFilePath(configDirectory)
img, err := configService.GetImageFromFilePath(configDirectory)
return img, err
}
h := components.StartMotionJPEG(imageFunction, 80)

View File

@@ -8,6 +8,8 @@ import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gofrs/uuid"
configService "github.com/kerberos-io/agent/machinery/src/config"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/onvif"
@@ -55,12 +57,22 @@ func HasMQTTClientModified(configuration *models.Configuration) bool {
}
func PackageMQTTMessage(msg Message) ([]byte, error) {
// We'll generate an unique id, and encrypt it using the private key.
msg.Mid = "0123456789+1"
// Create a Version 4 UUID.
u2, err := uuid.NewV4()
if err != nil {
log.Log.Error("failed to generate UUID: " + err.Error())
}
// We'll generate an unique id, and encrypt / decrypt it using the private key if available.
msg.Mid = u2.String()
msg.Timestamp = time.Now().Unix()
// At the moment we don't do the encryption part, but we'll implement it
// once the legacy methods (subscriptions are moved).
msg.Encrypted = false
msg.PublicKey = ""
msg.Fingerprint = ""
payload, err := json.Marshal(msg)
return payload, err
}
@@ -98,7 +110,7 @@ func PackageMQTTMessage(msg Message) ([]byte, error) {
// - {devicekey}/{sessionid}/answer: once a WebRTC request is received through (kerberos/register), we'll draft an answer and send it back to the remote WebRTC client.
// - kerberos/{hubkey}/device/{devicekey}/motion: a motion signal
func ConfigureMQTT(configuration *models.Configuration, communication *models.Communication) mqtt.Client {
func ConfigureMQTT(configDirectory string, configuration *models.Configuration, communication *models.Communication) mqtt.Client {
config := configuration.Config
@@ -174,8 +186,9 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
log.Log.Info("ConfigureMQTT: " + mqttClientID + " connected to " + mqttURL)
// Create a susbcription for listen and reply
MQTTListenerHandler(c, hubKey, configuration, communication)
MQTTListenerHandler(c, hubKey, configDirectory, configuration, communication)
// Legacy methods below -> should be converted to the above method.
// Create a subscription to know if send out a livestream or not.
MQTTListenerHandleLiveSD(c, hubKey, configuration, communication)
@@ -207,7 +220,7 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
return nil
}
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) {
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory string, configuration *models.Configuration, communication *models.Communication) {
if hubKey == "" {
log.Log.Info("MQTTListenerHandler: no hub key provided, not subscribing to kerberos/hub/{hubkey}")
} else {
@@ -225,6 +238,7 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configuration *m
var message Message
json.Unmarshal(msg.Payload(), &message)
if message.Mid != "" && message.Timestamp != 0 {
// Messages might be encrypted, if so we'll
// need to decrypt them.
@@ -242,6 +256,8 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configuration *m
// Not relevant for this device, so we'll ignore it.
} else {
// We'll find out which message we received, and act accordingly.
log.Log.Info("MQTTListenerHandler: received message with action: " + payload.Action)
switch payload.Action {
case "record":
HandleRecording(mqttClient, hubKey, payload, configuration, communication)
@@ -249,6 +265,10 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configuration *m
HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication)
case "update-ptz-position":
HandleUpdatePTZPosition(mqttClient, hubKey, payload, configuration, communication)
case "request-config":
HandleRequestConfig(mqttClient, hubKey, payload, configuration, communication)
case "update-config":
HandleUpdateConfig(mqttClient, hubKey, payload, configDirectory, configuration, communication)
}
}
}
@@ -336,10 +356,95 @@ func HandleUpdatePTZPosition(mqttClient mqtt.Client, hubKey string, payload Payl
}
}
// We received a request config request, we'll fetch the current config and send it back.
type RequestConfigPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp of the preset request.
}
func HandleRequestConfig(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
value := payload.Value
// Convert map[string]interface{} to RequestConfigPayload
jsonData, _ := json.Marshal(value)
var configPayload RequestConfigPayload
json.Unmarshal(jsonData, &configPayload)
if configPayload.Timestamp != 0 {
// Get Config from the device
key := configuration.Config.Key
name := configuration.Config.Name
if key != "" && name != "" {
var configMap map[string]interface{}
inrec, _ := json.Marshal(configuration.Config)
json.Unmarshal(inrec, &configMap)
message := Message{
Payload: Payload{
Action: "receive-config",
DeviceId: configuration.Config.Key,
Value: configMap,
},
}
payload, err := PackageMQTTMessage(message)
if err == nil {
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
} else {
log.Log.Info("HandleRequestConfig: something went wrong while sending config to hub: " + string(payload))
}
} else {
log.Log.Info("HandleRequestConfig: no config available")
}
log.Log.Info("HandleRequestConfig: Received a request for the config")
}
}
// We received a update config request, we'll update the current config and send a confirmation back.
type UpdateConfigPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp of the preset request.
Config models.Config `json:"config"`
}
func HandleUpdateConfig(mqttClient mqtt.Client, hubKey string, payload Payload, configDirectory string, configuration *models.Configuration, communication *models.Communication) {
value := payload.Value
// Convert map[string]interface{} to UpdateConfigPayload
jsonData, _ := json.Marshal(value)
var configPayload UpdateConfigPayload
json.Unmarshal(jsonData, &configPayload)
if configPayload.Timestamp != 0 {
config := configPayload.Config
err := configService.SaveConfig(configDirectory, config, configuration, communication)
if err == nil {
log.Log.Info("HandleUpdateConfig: Config updated")
message := Message{
Payload: Payload{
Action: "acknowledge-update-config",
DeviceId: configuration.Config.Key,
},
}
payload, err := PackageMQTTMessage(message)
if err == nil {
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
} else {
log.Log.Info("HandleRequestConfig: something went wrong while sending acknowledge config to hub: " + string(payload))
}
} else {
log.Log.Info("HandleUpdateConfig: Config update failed")
}
}
}
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
if mqttClient != nil {
// Cleanup all subscriptions
// New methods
mqttClient.Unsubscribe("kerberos/agent/" + PREV_HubKey)