Compare commits

...

2 Commits

13 changed files with 325 additions and 37 deletions

View File

@@ -10,7 +10,7 @@
"request": "launch",
"mode": "auto",
"program": "main.go",
"args": ["-action run"],
"args": ["-action", "run"],
"envFile": "${workspaceFolder}/.env",
"buildFlags": "--tags dynamic",
},

View File

@@ -25,7 +25,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.58
github.com/kerberos-io/onvif v0.0.5
github.com/kerberos-io/onvif v0.0.6
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7

View File

@@ -266,8 +266,8 @@ github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+Mrar
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.58 h1:R8EECSF+bG7o2yHC6cX/lF77Z+bDVGl6OioLZ3+5MN4=
github.com/kerberos-io/joy4 v1.0.58/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/kerberos-io/onvif v0.0.6 h1:+nvDuxGzQgHjc7V7kiYxUIcw1bO6R9esAMcxWRiKcwA=
github.com/kerberos-io/onvif v0.0.6/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U=

View File

@@ -0,0 +1 @@
package api

View File

@@ -19,7 +19,7 @@ import (
func CleanupRecordingDirectory(configDirectory string, configuration *models.Configuration) {
autoClean := configuration.Config.AutoClean
if autoClean != "false" {
if autoClean == "true" {
maxSize := configuration.Config.MaxDirectorySize
if maxSize == 0 {
maxSize = 300

View File

@@ -44,7 +44,7 @@ func UploadKerberosHub(configuration *models.Configuration, fileName string) (bo
if err != nil {
err := "UploadKerberosHub: Upload Failed, file doesn't exists anymore."
log.Log.Info(err)
return false, true, errors.New(err)
return false, false, errors.New(err)
}
// Check if we are allowed to upload to the hub with these credentials.

View File

@@ -44,7 +44,7 @@ func UploadKerberosVault(configuration *models.Configuration, fileName string) (
if err != nil {
err := "UploadKerberosVault: Upload Failed, file doesn't exists anymore."
log.Log.Info(err)
return false, true, errors.New(err)
return false, false, errors.New(err)
}
publicKey := config.KStorage.CloudKey

View File

@@ -90,11 +90,18 @@ func OpenConfig(configDirectory string, configuration *models.Configuration) {
if res.Err() != nil {
log.Log.Error("Could not find global configuration, using default configuration.")
panic("Could not find global configuration, using default configuration.")
}
err := res.Decode(&globalConfig)
if err != nil {
log.Log.Error("Could not find global configuration, using default configuration.")
panic("Could not find global configuration, using default configuration.")
}
if globalConfig.Type != "global" {
log.Log.Error("Could not find global configuration, might missed the mongodb connection.")
panic("Could not find global configuration, might missed the mongodb connection.")
}
configuration.GlobalConfig = globalConfig
var customConfig models.Config
@@ -110,6 +117,11 @@ func OpenConfig(configDirectory string, configuration *models.Configuration) {
if err != nil {
log.Log.Error("Could not find configuration for " + deploymentName + ", using global configuration.")
}
if customConfig.Type != "config" {
log.Log.Error("Could not find custom configuration, might missed the mongodb connection.")
panic("Could not find custom configuration, might missed the mongodb connection.")
}
configuration.CustomConfig = customConfig
// We will merge both configs in a single config file.

View File

@@ -41,7 +41,8 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
log.Log.Info("ProcessMotion: Motion detection enabled.")
key := config.HubKey
hubKey := config.HubKey
deviceKey := config.Key
// Allocate a VideoFrame
frame := ffmpeg.AllocVideoFrame()
@@ -167,10 +168,10 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
// If offline mode is disabled, send a message to the hub
if config.Offline != "true" {
if mqttClient != nil {
if key != "" {
mqttClient.Publish("kerberos/"+key+"/device/"+config.Key+"/motion", 2, false, "motion")
if hubKey != "" {
mqttClient.Publish("kerberos/"+hubKey+"/device/"+deviceKey+"/motion", 2, false, "motion")
} else {
mqttClient.Publish("kerberos/device/"+config.Key+"/motion", 2, false, "motion")
mqttClient.Publish("kerberos/device/"+deviceKey+"/motion", 2, false, "motion")
}
}
}

View File

@@ -12,4 +12,7 @@ type OnvifActionPTZ struct {
Down int `json:"down" bson:"down"`
Center int `json:"center" bson:"center"`
Zoom float64 `json:"zoom" bson:"zoom"`
X float64 `json:"x" bson:"x"`
Y float64 `json:"y" bson:"y"`
Z float64 `json:"z" bson:"z"`
}

View File

@@ -45,14 +45,25 @@ func HandleONVIFActions(configuration *models.Configuration, communication *mode
if err == nil {
if onvifAction.Action == "ptz" {
if onvifAction.Action == "absolute-move" {
// We will move the camera to zero position.
x := ptzAction.X
y := ptzAction.Y
z := ptzAction.Z
err := AbsolutePanTiltMove(device, configurations, token, x, y, z)
if err != nil {
log.Log.Error("HandleONVIFActions (AbsolutePanTitleMove): " + err.Error())
}
} else if onvifAction.Action == "ptz" {
if err == nil {
if ptzAction.Center == 1 {
// We will move the camera to zero position.
err := AbsolutePanTiltMove(device, configurations, token, 0, 0)
err := AbsolutePanTiltMove(device, configurations, token, 0, 0, 0)
if err != nil {
log.Log.Error("HandleONVIFActions (AbsolutePanTitleMove): " + err.Error())
}
@@ -179,18 +190,73 @@ func GetPTZConfigurationsFromDevice(device *onvif.Device) (ptz.GetConfigurations
return configurations, err
}
func AbsolutePanTiltMove(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float32, tilt float32) error {
func GetPositionFromDevice(configuration models.Configuration) (xsd.PTZVector, error) {
absoluteVector := xsd.Vector2D{
X: float64(pan),
Y: float64(tilt),
// We'll try to receive the PTZ configurations from the server
var status ptz.GetStatusResponse
var position xsd.PTZVector
// Connect to Onvif device
cameraConfiguration := configuration.Config.Capture.IPCamera
device, err := ConnectToOnvifDevice(&cameraConfiguration)
if err == nil {
// Get token from the first profile
token, err := GetTokenFromProfile(device, 0)
if err == nil {
// Get the PTZ configurations from the device
resp, err := device.CallMethod(ptz.GetStatus{
ProfileToken: token,
})
if err == nil {
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err == nil {
stringBody := string(b)
decodedXML, et, err := getXMLNode(stringBody, "GetStatusResponse")
if err != nil {
log.Log.Error("GetPositionFromDevice: " + err.Error())
return position, err
} else {
if err := decodedXML.DecodeElement(&status, et); err != nil {
log.Log.Error("GetPositionFromDevice: " + err.Error())
return position, err
}
}
}
}
position = status.PTZStatus.Position
return position, err
} else {
log.Log.Error("GetPositionFromDevice: " + err.Error())
return position, err
}
} else {
log.Log.Error("GetPositionFromDevice: " + err.Error())
return position, err
}
}
func AbsolutePanTiltMove(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float64, tilt float64, zoom float64) error {
absolutePantiltVector := xsd.Vector2D{
X: pan,
Y: tilt,
Space: configuration.PTZConfiguration.DefaultAbsolutePantTiltPositionSpace,
}
absoluteZoomVector := xsd.Vector1D{
X: zoom,
Space: configuration.PTZConfiguration.DefaultAbsoluteZoomPositionSpace,
}
res, err := device.CallMethod(ptz.AbsoluteMove{
ProfileToken: token,
Position: xsd.PTZVector{
PanTilt: absoluteVector,
PanTilt: absolutePantiltVector,
Zoom: absoluteZoomVector,
},
})

View File

@@ -10,9 +10,29 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/onvif"
"github.com/kerberos-io/agent/machinery/src/webrtc"
)
// The message structure which is used to send over
// and receive messages from the MQTT broker
type Message struct {
Mid string `json:"mid"`
Timestamp int64 `json:"timestamp"`
Encrypted bool `json:"encrypted"`
PublicKey string `json:"public_key"`
Fingerprint string `json:"fingerprint"`
Payload Payload `json:"payload"`
}
// The payload structure which is used to send over
// and receive messages from the MQTT broker
type Payload struct {
Action string `json:"action"`
DeviceId string `json:"device_id"`
Value map[string]interface{} `json:"value"`
}
// We'll cache the MQTT settings to know if we need to reinitialize the MQTT client connection.
// If we update the configuration but no new MQTT settings are provided, we don't need to restart it.
var PREV_MQTTURI string
@@ -34,6 +54,50 @@ func HasMQTTClientModified(configuration *models.Configuration) bool {
return false
}
func PackageMQTTMessage(msg Message) ([]byte, error) {
// We'll generate an unique id, and encrypt it using the private key.
msg.Mid = "0123456789+1"
msg.Timestamp = time.Now().Unix()
msg.Encrypted = false
msg.PublicKey = ""
msg.Fingerprint = ""
payload, err := json.Marshal(msg)
return payload, err
}
// Configuring MQTT to subscribe for various bi-directional messaging
// Listen and reply (a generic method to share and retrieve information)
//
// !!! NEW METHOD TO COMMUNICATE: only create a single subscription for all communication.
// and an additional publish messages back
//
// - [SUBSCRIPTION] kerberos/agent/{hubkey} (hub -> agent)
// - [PUBLISH] kerberos/hub/{hubkey} (agent -> hub)
//
// !!! LEGACY METHODS BELOW, WE SHOULD LEVERAGE THE ABOVE METHOD!
//
// [SUBSCRIPTIONS]
//
// SD Streaming (Base64 JPEGs)
// - kerberos/{hubkey}/device/{devicekey}/request-live: use for polling of SD live streaming (as long the user requests stream, we'll send JPEGs over).
//
// HD Streaming (WebRTC)
// - kerberos/register: use for receiving HD live streaming requests.
// - candidate/cloud: remote ICE candidates are shared over this line.
// - kerberos/webrtc/keepalivehub/{devicekey}: use for polling of HD streaming (as long the user requests stream, we'll send it over).
// - kerberos/webrtc/peers/{devicekey}: we'll keep track of the number of peers (we can have more than 1 concurrent listeners).
//
// ONVIF capabilities
// - kerberos/onvif/{devicekey}: endpoint to execute ONVIF commands such as (PTZ, Zoom, IO, etc)
//
// [PUBlISH]
// Next to subscribing to various topics, we'll also publish messages to various topics, find a list of available Publish methods.
//
// - kerberos/webrtc/packets/{devicekey}: use for forwarding WebRTC (RTP Packets) over MQTT -> Complex firewall.
// - kerberos/webrtc/keepalive/{devicekey}: use for keeping alive forwarded WebRTC stream
// - {devicekey}/{sessionid}/answer: once a WebRTC request is received through (kerberos/register), we'll draft an answer and send it back to the remote WebRTC client.
// - kerberos/{hubkey}/device/{devicekey}/motion: a motion signal
func ConfigureMQTT(configuration *models.Configuration, communication *models.Communication) mqtt.Client {
config := configuration.Config
@@ -109,6 +173,9 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
// We managed to connect to the MQTT broker, hurray!
log.Log.Info("ConfigureMQTT: " + mqttClientID + " connected to " + mqttURL)
// Create a susbcription for listen and reply
MQTTListenerHandler(c, hubKey, configuration, communication)
// Create a subscription to know if send out a livestream or not.
MQTTListenerHandleLiveSD(c, hubKey, configuration, communication)
@@ -140,6 +207,159 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
return nil
}
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) {
if hubKey == "" {
log.Log.Info("MQTTListenerHandler: no hub key provided, not subscribing to kerberos/hub/{hubkey}")
} else {
topicOnvif := fmt.Sprintf("kerberos/agent/%s", hubKey)
mqttClient.Subscribe(topicOnvif, 1, func(c mqtt.Client, msg mqtt.Message) {
// Decode the message, we are expecting following format.
// {
// mid: string, "unique id for the message"
// timestamp: int64, "unix timestamp when the message was generated"
// encrypted: boolean,
// fingerprint: string, "fingerprint of the message to validate authenticity"
// payload: Payload, "a json object which might be encrypted"
// }
var message Message
json.Unmarshal(msg.Payload(), &message)
if message.Mid != "" && message.Timestamp != 0 {
// Messages might be encrypted, if so we'll
// need to decrypt them.
var payload Payload
if message.Encrypted {
// We'll find out the key we use to decrypt the message.
// TODO -> still needs to be implemented.
// Use to fingerprint to act accordingly.
} else {
payload = message.Payload
}
// We will receive all messages from our hub, so we'll need to filter to the relevant device.
if payload.DeviceId != configuration.Config.Key {
// Not relevant for this device, so we'll ignore it.
} else {
// We'll find out which message we received, and act accordingly.
switch payload.Action {
case "record":
HandleRecording(mqttClient, hubKey, payload, configuration, communication)
case "get-ptz-position":
HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication)
case "update-ptz-position":
HandleUpdatePTZPosition(mqttClient, hubKey, payload, configuration, communication)
}
}
}
})
}
}
// We received a recording request, we'll send it to the motion handler.
type RecordPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp of the recording request.
}
func HandleRecording(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
value := payload.Value
// Convert map[string]interface{} to RecordPayload
jsonData, _ := json.Marshal(value)
var recordPayload RecordPayload
json.Unmarshal(jsonData, &recordPayload)
if recordPayload.Timestamp != 0 {
motionDataPartial := models.MotionDataPartial{
Timestamp: recordPayload.Timestamp,
}
communication.HandleMotion <- motionDataPartial
}
}
// We received a preset position request, we'll request it through onvif and send it back.
type PTZPositionPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp of the preset request.
}
func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
value := payload.Value
// Convert map[string]interface{} to PTZPositionPayload
jsonData, _ := json.Marshal(value)
var positionPayload PTZPositionPayload
json.Unmarshal(jsonData, &positionPayload)
if positionPayload.Timestamp != 0 {
// Get Position from device
pos, err := onvif.GetPositionFromDevice(*configuration)
if err != nil {
log.Log.Error("HandlePTZPosition: error getting position from device: " + err.Error())
} else {
// Needs to wrapped!
posString := fmt.Sprintf("%f,%f,%f", pos.PanTilt.X, pos.PanTilt.Y, pos.Zoom.X)
message := Message{
Payload: Payload{
Action: "ptz-position",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"timestamp": positionPayload.Timestamp,
"position": posString,
},
},
}
payload, err := PackageMQTTMessage(message)
if err == nil {
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
} else {
log.Log.Info("HandlePTZPosition: something went wrong while sending position to hub: " + string(payload))
}
}
}
}
func HandleUpdatePTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
value := payload.Value
// Convert map[string]interface{} to PTZPositionPayload
jsonData, _ := json.Marshal(value)
var onvifAction models.OnvifAction
json.Unmarshal(jsonData, &onvifAction)
if onvifAction.Action != "" {
if communication.CameraConnected {
communication.HandleONVIF <- onvifAction
log.Log.Info("MQTTListenerHandleONVIF: Received an action - " + onvifAction.Action)
} else {
log.Log.Info("MQTTListenerHandleONVIF: received action, but camera is not connected.")
}
}
}
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
if mqttClient != nil {
// Cleanup all subscriptions
// New methods
mqttClient.Unsubscribe("kerberos/agent/" + PREV_HubKey)
// Legacy methods
mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live")
mqttClient.Unsubscribe(PREV_AgentKey + "/register")
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey)
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey)
mqttClient.Unsubscribe("candidate/cloud")
mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey)
mqttClient.Disconnect(1000)
mqttClient = nil
log.Log.Info("DisconnectMQTT: MQTT client disconnected.")
}
}
// #################################################################################################
// Below you'll find legacy methods, as of now we'll have a single subscription, which scales better
func MQTTListenerHandleLiveSD(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) {
config := configuration.Config
topicRequest := "kerberos/" + hubKey + "/device/" + config.Key + "/request-live"
@@ -243,18 +463,3 @@ func MQTTListenerHandleONVIF(mqttClient mqtt.Client, hubKey string, configuratio
}
})
}
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
if mqttClient != nil {
// Cleanup all subscriptions
mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live")
mqttClient.Unsubscribe(PREV_AgentKey + "/register")
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey)
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey)
mqttClient.Unsubscribe("candidate/cloud")
mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey)
mqttClient.Disconnect(1000)
mqttClient = nil
log.Log.Info("DisconnectMQTT: MQTT client disconnected.")
}
}

View File

@@ -91,14 +91,14 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
config := configuration.Config
name := config.Key
deviceKey := config.Key
stunServers := []string{config.STUNURI}
turnServers := []string{config.TURNURI}
turnServersUsername := config.TURNUsername
turnServersCredential := config.TURNPassword
// Create WebRTC object
w := CreateWebRTC(name, stunServers, turnServers, turnServersUsername, turnServersCredential)
w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential)
sd, err := w.DecodeSessionDescription(handshake.Sdp)
if err == nil {
@@ -187,7 +187,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
candidatesMux.Lock()
defer candidatesMux.Unlock()
topic := fmt.Sprintf("%s/%s/candidate/edge", name, handshake.Cuuid)
topic := fmt.Sprintf("%s/%s/candidate/edge", deviceKey, handshake.Cuuid)
log.Log.Info("InitializeWebRTCConnection: Send candidate to " + topic)
candiInit := candidate.ToJSON()
sdpmid := "0"
@@ -203,7 +203,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
peerConnections[handshake.Cuuid] = peerConnection
if err == nil {
topic := fmt.Sprintf("%s/%s/answer", name, handshake.Cuuid)
topic := fmt.Sprintf("%s/%s/answer", deviceKey, handshake.Cuuid)
log.Log.Info("InitializeWebRTCConnection: Send SDP answer to " + topic)
mqttClient.Publish(topic, 2, false, []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP))))
}