mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-03 13:50:11 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa76dd1ec8 | ||
|
|
384448d123 |
2
machinery/.vscode/launch.json
vendored
2
machinery/.vscode/launch.json
vendored
@@ -10,7 +10,7 @@
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"program": "main.go",
|
||||
"args": ["-action run"],
|
||||
"args": ["-action", "run"],
|
||||
"envFile": "${workspaceFolder}/.env",
|
||||
"buildFlags": "--tags dynamic",
|
||||
},
|
||||
|
||||
@@ -25,7 +25,7 @@ require (
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kellydunn/golang-geo v0.7.0
|
||||
github.com/kerberos-io/joy4 v1.0.58
|
||||
github.com/kerberos-io/onvif v0.0.5
|
||||
github.com/kerberos-io/onvif v0.0.6
|
||||
github.com/minio/minio-go/v6 v6.0.57
|
||||
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
|
||||
@@ -266,8 +266,8 @@ github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+Mrar
|
||||
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
|
||||
github.com/kerberos-io/joy4 v1.0.58 h1:R8EECSF+bG7o2yHC6cX/lF77Z+bDVGl6OioLZ3+5MN4=
|
||||
github.com/kerberos-io/joy4 v1.0.58/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
|
||||
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/kerberos-io/onvif v0.0.6 h1:+nvDuxGzQgHjc7V7kiYxUIcw1bO6R9esAMcxWRiKcwA=
|
||||
github.com/kerberos-io/onvif v0.0.6/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U=
|
||||
|
||||
1
machinery/src/api/onvif.go
Normal file
1
machinery/src/api/onvif.go
Normal file
@@ -0,0 +1 @@
|
||||
package api
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
|
||||
func CleanupRecordingDirectory(configDirectory string, configuration *models.Configuration) {
|
||||
autoClean := configuration.Config.AutoClean
|
||||
if autoClean != "false" {
|
||||
if autoClean == "true" {
|
||||
maxSize := configuration.Config.MaxDirectorySize
|
||||
if maxSize == 0 {
|
||||
maxSize = 300
|
||||
|
||||
@@ -44,7 +44,7 @@ func UploadKerberosHub(configuration *models.Configuration, fileName string) (bo
|
||||
if err != nil {
|
||||
err := "UploadKerberosHub: Upload Failed, file doesn't exists anymore."
|
||||
log.Log.Info(err)
|
||||
return false, true, errors.New(err)
|
||||
return false, false, errors.New(err)
|
||||
}
|
||||
|
||||
// Check if we are allowed to upload to the hub with these credentials.
|
||||
|
||||
@@ -44,7 +44,7 @@ func UploadKerberosVault(configuration *models.Configuration, fileName string) (
|
||||
if err != nil {
|
||||
err := "UploadKerberosVault: Upload Failed, file doesn't exists anymore."
|
||||
log.Log.Info(err)
|
||||
return false, true, errors.New(err)
|
||||
return false, false, errors.New(err)
|
||||
}
|
||||
|
||||
publicKey := config.KStorage.CloudKey
|
||||
|
||||
@@ -90,11 +90,18 @@ func OpenConfig(configDirectory string, configuration *models.Configuration) {
|
||||
|
||||
if res.Err() != nil {
|
||||
log.Log.Error("Could not find global configuration, using default configuration.")
|
||||
panic("Could not find global configuration, using default configuration.")
|
||||
}
|
||||
err := res.Decode(&globalConfig)
|
||||
if err != nil {
|
||||
log.Log.Error("Could not find global configuration, using default configuration.")
|
||||
panic("Could not find global configuration, using default configuration.")
|
||||
}
|
||||
if globalConfig.Type != "global" {
|
||||
log.Log.Error("Could not find global configuration, might missed the mongodb connection.")
|
||||
panic("Could not find global configuration, might missed the mongodb connection.")
|
||||
}
|
||||
|
||||
configuration.GlobalConfig = globalConfig
|
||||
|
||||
var customConfig models.Config
|
||||
@@ -110,6 +117,11 @@ func OpenConfig(configDirectory string, configuration *models.Configuration) {
|
||||
if err != nil {
|
||||
log.Log.Error("Could not find configuration for " + deploymentName + ", using global configuration.")
|
||||
}
|
||||
|
||||
if customConfig.Type != "config" {
|
||||
log.Log.Error("Could not find custom configuration, might missed the mongodb connection.")
|
||||
panic("Could not find custom configuration, might missed the mongodb connection.")
|
||||
}
|
||||
configuration.CustomConfig = customConfig
|
||||
|
||||
// We will merge both configs in a single config file.
|
||||
|
||||
@@ -41,7 +41,8 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
|
||||
log.Log.Info("ProcessMotion: Motion detection enabled.")
|
||||
|
||||
key := config.HubKey
|
||||
hubKey := config.HubKey
|
||||
deviceKey := config.Key
|
||||
|
||||
// Allocate a VideoFrame
|
||||
frame := ffmpeg.AllocVideoFrame()
|
||||
@@ -167,10 +168,10 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
// If offline mode is disabled, send a message to the hub
|
||||
if config.Offline != "true" {
|
||||
if mqttClient != nil {
|
||||
if key != "" {
|
||||
mqttClient.Publish("kerberos/"+key+"/device/"+config.Key+"/motion", 2, false, "motion")
|
||||
if hubKey != "" {
|
||||
mqttClient.Publish("kerberos/"+hubKey+"/device/"+deviceKey+"/motion", 2, false, "motion")
|
||||
} else {
|
||||
mqttClient.Publish("kerberos/device/"+config.Key+"/motion", 2, false, "motion")
|
||||
mqttClient.Publish("kerberos/device/"+deviceKey+"/motion", 2, false, "motion")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,4 +12,7 @@ type OnvifActionPTZ struct {
|
||||
Down int `json:"down" bson:"down"`
|
||||
Center int `json:"center" bson:"center"`
|
||||
Zoom float64 `json:"zoom" bson:"zoom"`
|
||||
X float64 `json:"x" bson:"x"`
|
||||
Y float64 `json:"y" bson:"y"`
|
||||
Z float64 `json:"z" bson:"z"`
|
||||
}
|
||||
|
||||
@@ -45,14 +45,25 @@ func HandleONVIFActions(configuration *models.Configuration, communication *mode
|
||||
|
||||
if err == nil {
|
||||
|
||||
if onvifAction.Action == "ptz" {
|
||||
if onvifAction.Action == "absolute-move" {
|
||||
|
||||
// We will move the camera to zero position.
|
||||
x := ptzAction.X
|
||||
y := ptzAction.Y
|
||||
z := ptzAction.Z
|
||||
err := AbsolutePanTiltMove(device, configurations, token, x, y, z)
|
||||
if err != nil {
|
||||
log.Log.Error("HandleONVIFActions (AbsolutePanTitleMove): " + err.Error())
|
||||
}
|
||||
|
||||
} else if onvifAction.Action == "ptz" {
|
||||
|
||||
if err == nil {
|
||||
|
||||
if ptzAction.Center == 1 {
|
||||
|
||||
// We will move the camera to zero position.
|
||||
err := AbsolutePanTiltMove(device, configurations, token, 0, 0)
|
||||
err := AbsolutePanTiltMove(device, configurations, token, 0, 0, 0)
|
||||
if err != nil {
|
||||
log.Log.Error("HandleONVIFActions (AbsolutePanTitleMove): " + err.Error())
|
||||
}
|
||||
@@ -179,18 +190,73 @@ func GetPTZConfigurationsFromDevice(device *onvif.Device) (ptz.GetConfigurations
|
||||
return configurations, err
|
||||
}
|
||||
|
||||
func AbsolutePanTiltMove(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float32, tilt float32) error {
|
||||
func GetPositionFromDevice(configuration models.Configuration) (xsd.PTZVector, error) {
|
||||
|
||||
absoluteVector := xsd.Vector2D{
|
||||
X: float64(pan),
|
||||
Y: float64(tilt),
|
||||
// We'll try to receive the PTZ configurations from the server
|
||||
var status ptz.GetStatusResponse
|
||||
var position xsd.PTZVector
|
||||
|
||||
// Connect to Onvif device
|
||||
cameraConfiguration := configuration.Config.Capture.IPCamera
|
||||
device, err := ConnectToOnvifDevice(&cameraConfiguration)
|
||||
if err == nil {
|
||||
|
||||
// Get token from the first profile
|
||||
token, err := GetTokenFromProfile(device, 0)
|
||||
if err == nil {
|
||||
|
||||
// Get the PTZ configurations from the device
|
||||
resp, err := device.CallMethod(ptz.GetStatus{
|
||||
ProfileToken: token,
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
defer resp.Body.Close()
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err == nil {
|
||||
stringBody := string(b)
|
||||
decodedXML, et, err := getXMLNode(stringBody, "GetStatusResponse")
|
||||
if err != nil {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
} else {
|
||||
if err := decodedXML.DecodeElement(&status, et); err != nil {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
position = status.PTZStatus.Position
|
||||
return position, err
|
||||
} else {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
}
|
||||
} else {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
}
|
||||
}
|
||||
|
||||
func AbsolutePanTiltMove(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float64, tilt float64, zoom float64) error {
|
||||
|
||||
absolutePantiltVector := xsd.Vector2D{
|
||||
X: pan,
|
||||
Y: tilt,
|
||||
Space: configuration.PTZConfiguration.DefaultAbsolutePantTiltPositionSpace,
|
||||
}
|
||||
|
||||
absoluteZoomVector := xsd.Vector1D{
|
||||
X: zoom,
|
||||
Space: configuration.PTZConfiguration.DefaultAbsoluteZoomPositionSpace,
|
||||
}
|
||||
|
||||
res, err := device.CallMethod(ptz.AbsoluteMove{
|
||||
ProfileToken: token,
|
||||
Position: xsd.PTZVector{
|
||||
PanTilt: absoluteVector,
|
||||
PanTilt: absolutePantiltVector,
|
||||
Zoom: absoluteZoomVector,
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@@ -10,9 +10,29 @@ import (
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
"github.com/kerberos-io/agent/machinery/src/onvif"
|
||||
"github.com/kerberos-io/agent/machinery/src/webrtc"
|
||||
)
|
||||
|
||||
// The message structure which is used to send over
|
||||
// and receive messages from the MQTT broker
|
||||
type Message struct {
|
||||
Mid string `json:"mid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
PublicKey string `json:"public_key"`
|
||||
Fingerprint string `json:"fingerprint"`
|
||||
Payload Payload `json:"payload"`
|
||||
}
|
||||
|
||||
// The payload structure which is used to send over
|
||||
// and receive messages from the MQTT broker
|
||||
type Payload struct {
|
||||
Action string `json:"action"`
|
||||
DeviceId string `json:"device_id"`
|
||||
Value map[string]interface{} `json:"value"`
|
||||
}
|
||||
|
||||
// We'll cache the MQTT settings to know if we need to reinitialize the MQTT client connection.
|
||||
// If we update the configuration but no new MQTT settings are provided, we don't need to restart it.
|
||||
var PREV_MQTTURI string
|
||||
@@ -34,6 +54,50 @@ func HasMQTTClientModified(configuration *models.Configuration) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func PackageMQTTMessage(msg Message) ([]byte, error) {
|
||||
// We'll generate an unique id, and encrypt it using the private key.
|
||||
msg.Mid = "0123456789+1"
|
||||
msg.Timestamp = time.Now().Unix()
|
||||
msg.Encrypted = false
|
||||
msg.PublicKey = ""
|
||||
msg.Fingerprint = ""
|
||||
payload, err := json.Marshal(msg)
|
||||
return payload, err
|
||||
}
|
||||
|
||||
// Configuring MQTT to subscribe for various bi-directional messaging
|
||||
// Listen and reply (a generic method to share and retrieve information)
|
||||
//
|
||||
// !!! NEW METHOD TO COMMUNICATE: only create a single subscription for all communication.
|
||||
// and an additional publish messages back
|
||||
//
|
||||
// - [SUBSCRIPTION] kerberos/agent/{hubkey} (hub -> agent)
|
||||
// - [PUBLISH] kerberos/hub/{hubkey} (agent -> hub)
|
||||
//
|
||||
// !!! LEGACY METHODS BELOW, WE SHOULD LEVERAGE THE ABOVE METHOD!
|
||||
//
|
||||
// [SUBSCRIPTIONS]
|
||||
//
|
||||
// SD Streaming (Base64 JPEGs)
|
||||
// - kerberos/{hubkey}/device/{devicekey}/request-live: use for polling of SD live streaming (as long the user requests stream, we'll send JPEGs over).
|
||||
//
|
||||
// HD Streaming (WebRTC)
|
||||
// - kerberos/register: use for receiving HD live streaming requests.
|
||||
// - candidate/cloud: remote ICE candidates are shared over this line.
|
||||
// - kerberos/webrtc/keepalivehub/{devicekey}: use for polling of HD streaming (as long the user requests stream, we'll send it over).
|
||||
// - kerberos/webrtc/peers/{devicekey}: we'll keep track of the number of peers (we can have more than 1 concurrent listeners).
|
||||
//
|
||||
// ONVIF capabilities
|
||||
// - kerberos/onvif/{devicekey}: endpoint to execute ONVIF commands such as (PTZ, Zoom, IO, etc)
|
||||
//
|
||||
// [PUBlISH]
|
||||
// Next to subscribing to various topics, we'll also publish messages to various topics, find a list of available Publish methods.
|
||||
//
|
||||
// - kerberos/webrtc/packets/{devicekey}: use for forwarding WebRTC (RTP Packets) over MQTT -> Complex firewall.
|
||||
// - kerberos/webrtc/keepalive/{devicekey}: use for keeping alive forwarded WebRTC stream
|
||||
// - {devicekey}/{sessionid}/answer: once a WebRTC request is received through (kerberos/register), we'll draft an answer and send it back to the remote WebRTC client.
|
||||
// - kerberos/{hubkey}/device/{devicekey}/motion: a motion signal
|
||||
|
||||
func ConfigureMQTT(configuration *models.Configuration, communication *models.Communication) mqtt.Client {
|
||||
|
||||
config := configuration.Config
|
||||
@@ -109,6 +173,9 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
|
||||
// We managed to connect to the MQTT broker, hurray!
|
||||
log.Log.Info("ConfigureMQTT: " + mqttClientID + " connected to " + mqttURL)
|
||||
|
||||
// Create a susbcription for listen and reply
|
||||
MQTTListenerHandler(c, hubKey, configuration, communication)
|
||||
|
||||
// Create a subscription to know if send out a livestream or not.
|
||||
MQTTListenerHandleLiveSD(c, hubKey, configuration, communication)
|
||||
|
||||
@@ -140,6 +207,159 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
|
||||
return nil
|
||||
}
|
||||
|
||||
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) {
|
||||
if hubKey == "" {
|
||||
log.Log.Info("MQTTListenerHandler: no hub key provided, not subscribing to kerberos/hub/{hubkey}")
|
||||
} else {
|
||||
topicOnvif := fmt.Sprintf("kerberos/agent/%s", hubKey)
|
||||
mqttClient.Subscribe(topicOnvif, 1, func(c mqtt.Client, msg mqtt.Message) {
|
||||
|
||||
// Decode the message, we are expecting following format.
|
||||
// {
|
||||
// mid: string, "unique id for the message"
|
||||
// timestamp: int64, "unix timestamp when the message was generated"
|
||||
// encrypted: boolean,
|
||||
// fingerprint: string, "fingerprint of the message to validate authenticity"
|
||||
// payload: Payload, "a json object which might be encrypted"
|
||||
// }
|
||||
|
||||
var message Message
|
||||
json.Unmarshal(msg.Payload(), &message)
|
||||
if message.Mid != "" && message.Timestamp != 0 {
|
||||
// Messages might be encrypted, if so we'll
|
||||
// need to decrypt them.
|
||||
var payload Payload
|
||||
if message.Encrypted {
|
||||
// We'll find out the key we use to decrypt the message.
|
||||
// TODO -> still needs to be implemented.
|
||||
// Use to fingerprint to act accordingly.
|
||||
} else {
|
||||
payload = message.Payload
|
||||
}
|
||||
|
||||
// We will receive all messages from our hub, so we'll need to filter to the relevant device.
|
||||
if payload.DeviceId != configuration.Config.Key {
|
||||
// Not relevant for this device, so we'll ignore it.
|
||||
} else {
|
||||
// We'll find out which message we received, and act accordingly.
|
||||
switch payload.Action {
|
||||
case "record":
|
||||
HandleRecording(mqttClient, hubKey, payload, configuration, communication)
|
||||
case "get-ptz-position":
|
||||
HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication)
|
||||
case "update-ptz-position":
|
||||
HandleUpdatePTZPosition(mqttClient, hubKey, payload, configuration, communication)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// We received a recording request, we'll send it to the motion handler.
|
||||
type RecordPayload struct {
|
||||
Timestamp int64 `json:"timestamp"` // timestamp of the recording request.
|
||||
}
|
||||
|
||||
func HandleRecording(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
|
||||
value := payload.Value
|
||||
|
||||
// Convert map[string]interface{} to RecordPayload
|
||||
jsonData, _ := json.Marshal(value)
|
||||
var recordPayload RecordPayload
|
||||
json.Unmarshal(jsonData, &recordPayload)
|
||||
|
||||
if recordPayload.Timestamp != 0 {
|
||||
motionDataPartial := models.MotionDataPartial{
|
||||
Timestamp: recordPayload.Timestamp,
|
||||
}
|
||||
communication.HandleMotion <- motionDataPartial
|
||||
}
|
||||
}
|
||||
|
||||
// We received a preset position request, we'll request it through onvif and send it back.
|
||||
type PTZPositionPayload struct {
|
||||
Timestamp int64 `json:"timestamp"` // timestamp of the preset request.
|
||||
}
|
||||
|
||||
func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
|
||||
value := payload.Value
|
||||
|
||||
// Convert map[string]interface{} to PTZPositionPayload
|
||||
jsonData, _ := json.Marshal(value)
|
||||
var positionPayload PTZPositionPayload
|
||||
json.Unmarshal(jsonData, &positionPayload)
|
||||
|
||||
if positionPayload.Timestamp != 0 {
|
||||
// Get Position from device
|
||||
pos, err := onvif.GetPositionFromDevice(*configuration)
|
||||
if err != nil {
|
||||
log.Log.Error("HandlePTZPosition: error getting position from device: " + err.Error())
|
||||
} else {
|
||||
// Needs to wrapped!
|
||||
posString := fmt.Sprintf("%f,%f,%f", pos.PanTilt.X, pos.PanTilt.Y, pos.Zoom.X)
|
||||
message := Message{
|
||||
Payload: Payload{
|
||||
Action: "ptz-position",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: map[string]interface{}{
|
||||
"timestamp": positionPayload.Timestamp,
|
||||
"position": posString,
|
||||
},
|
||||
},
|
||||
}
|
||||
payload, err := PackageMQTTMessage(message)
|
||||
if err == nil {
|
||||
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
|
||||
} else {
|
||||
log.Log.Info("HandlePTZPosition: something went wrong while sending position to hub: " + string(payload))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func HandleUpdatePTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
|
||||
value := payload.Value
|
||||
|
||||
// Convert map[string]interface{} to PTZPositionPayload
|
||||
jsonData, _ := json.Marshal(value)
|
||||
var onvifAction models.OnvifAction
|
||||
json.Unmarshal(jsonData, &onvifAction)
|
||||
|
||||
if onvifAction.Action != "" {
|
||||
if communication.CameraConnected {
|
||||
communication.HandleONVIF <- onvifAction
|
||||
log.Log.Info("MQTTListenerHandleONVIF: Received an action - " + onvifAction.Action)
|
||||
} else {
|
||||
log.Log.Info("MQTTListenerHandleONVIF: received action, but camera is not connected.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
|
||||
if mqttClient != nil {
|
||||
// Cleanup all subscriptions
|
||||
|
||||
// New methods
|
||||
mqttClient.Unsubscribe("kerberos/agent/" + PREV_HubKey)
|
||||
|
||||
// Legacy methods
|
||||
mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live")
|
||||
mqttClient.Unsubscribe(PREV_AgentKey + "/register")
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("candidate/cloud")
|
||||
mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey)
|
||||
|
||||
mqttClient.Disconnect(1000)
|
||||
mqttClient = nil
|
||||
log.Log.Info("DisconnectMQTT: MQTT client disconnected.")
|
||||
}
|
||||
}
|
||||
|
||||
// #################################################################################################
|
||||
// Below you'll find legacy methods, as of now we'll have a single subscription, which scales better
|
||||
|
||||
func MQTTListenerHandleLiveSD(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) {
|
||||
config := configuration.Config
|
||||
topicRequest := "kerberos/" + hubKey + "/device/" + config.Key + "/request-live"
|
||||
@@ -243,18 +463,3 @@ func MQTTListenerHandleONVIF(mqttClient mqtt.Client, hubKey string, configuratio
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
|
||||
if mqttClient != nil {
|
||||
// Cleanup all subscriptions
|
||||
mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live")
|
||||
mqttClient.Unsubscribe(PREV_AgentKey + "/register")
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("candidate/cloud")
|
||||
mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey)
|
||||
mqttClient.Disconnect(1000)
|
||||
mqttClient = nil
|
||||
log.Log.Info("DisconnectMQTT: MQTT client disconnected.")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,14 +91,14 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
|
||||
config := configuration.Config
|
||||
|
||||
name := config.Key
|
||||
deviceKey := config.Key
|
||||
stunServers := []string{config.STUNURI}
|
||||
turnServers := []string{config.TURNURI}
|
||||
turnServersUsername := config.TURNUsername
|
||||
turnServersCredential := config.TURNPassword
|
||||
|
||||
// Create WebRTC object
|
||||
w := CreateWebRTC(name, stunServers, turnServers, turnServersUsername, turnServersCredential)
|
||||
w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential)
|
||||
sd, err := w.DecodeSessionDescription(handshake.Sdp)
|
||||
|
||||
if err == nil {
|
||||
@@ -187,7 +187,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
candidatesMux.Lock()
|
||||
defer candidatesMux.Unlock()
|
||||
|
||||
topic := fmt.Sprintf("%s/%s/candidate/edge", name, handshake.Cuuid)
|
||||
topic := fmt.Sprintf("%s/%s/candidate/edge", deviceKey, handshake.Cuuid)
|
||||
log.Log.Info("InitializeWebRTCConnection: Send candidate to " + topic)
|
||||
candiInit := candidate.ToJSON()
|
||||
sdpmid := "0"
|
||||
@@ -203,7 +203,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
peerConnections[handshake.Cuuid] = peerConnection
|
||||
|
||||
if err == nil {
|
||||
topic := fmt.Sprintf("%s/%s/answer", name, handshake.Cuuid)
|
||||
topic := fmt.Sprintf("%s/%s/answer", deviceKey, handshake.Cuuid)
|
||||
log.Log.Info("InitializeWebRTCConnection: Send SDP answer to " + topic)
|
||||
mqttClient.Publish(topic, 2, false, []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP))))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user