mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-03 13:50:11 +00:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c71cb71d08 | ||
|
|
65a739ea75 | ||
|
|
410a62e9ef | ||
|
|
aa76dd1ec8 | ||
|
|
384448d123 | ||
|
|
414f74758c | ||
|
|
25403ccdab | ||
|
|
4c03132b83 | ||
|
|
470f8f1cb6 |
2
machinery/.vscode/launch.json
vendored
2
machinery/.vscode/launch.json
vendored
@@ -10,7 +10,7 @@
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"program": "main.go",
|
||||
"args": ["-action run"],
|
||||
"args": ["-action", "run"],
|
||||
"envFile": "${workspaceFolder}/.env",
|
||||
"buildFlags": "--tags dynamic",
|
||||
},
|
||||
|
||||
@@ -25,7 +25,7 @@ require (
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kellydunn/golang-geo v0.7.0
|
||||
github.com/kerberos-io/joy4 v1.0.58
|
||||
github.com/kerberos-io/onvif v0.0.5
|
||||
github.com/kerberos-io/onvif v0.0.6
|
||||
github.com/minio/minio-go/v6 v6.0.57
|
||||
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
|
||||
@@ -266,8 +266,8 @@ github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+Mrar
|
||||
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
|
||||
github.com/kerberos-io/joy4 v1.0.58 h1:R8EECSF+bG7o2yHC6cX/lF77Z+bDVGl6OioLZ3+5MN4=
|
||||
github.com/kerberos-io/joy4 v1.0.58/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
|
||||
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/kerberos-io/onvif v0.0.6 h1:+nvDuxGzQgHjc7V7kiYxUIcw1bO6R9esAMcxWRiKcwA=
|
||||
github.com/kerberos-io/onvif v0.0.6/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U=
|
||||
|
||||
1
machinery/src/api/onvif.go
Normal file
1
machinery/src/api/onvif.go
Normal file
@@ -0,0 +1 @@
|
||||
package api
|
||||
@@ -121,7 +121,7 @@ func HandleUpload(configDirectory string, configuration *models.Configuration, c
|
||||
|
||||
// Check if we need to remove the original recording
|
||||
// removeAfterUpload is set to false by default
|
||||
if config.RemoveAfterUpload == "true" {
|
||||
if config.RemoveAfterUpload != "false" {
|
||||
err := os.Remove(configDirectory + "/data/recordings/" + fileName)
|
||||
if err != nil {
|
||||
log.Log.Error("HandleUpload: " + err.Error())
|
||||
|
||||
@@ -44,7 +44,7 @@ func UploadKerberosHub(configuration *models.Configuration, fileName string) (bo
|
||||
if err != nil {
|
||||
err := "UploadKerberosHub: Upload Failed, file doesn't exists anymore."
|
||||
log.Log.Info(err)
|
||||
return false, true, errors.New(err)
|
||||
return false, false, errors.New(err)
|
||||
}
|
||||
|
||||
// Check if we are allowed to upload to the hub with these credentials.
|
||||
|
||||
@@ -44,7 +44,7 @@ func UploadKerberosVault(configuration *models.Configuration, fileName string) (
|
||||
if err != nil {
|
||||
err := "UploadKerberosVault: Upload Failed, file doesn't exists anymore."
|
||||
log.Log.Info(err)
|
||||
return false, true, errors.New(err)
|
||||
return false, false, errors.New(err)
|
||||
}
|
||||
|
||||
publicKey := config.KStorage.CloudKey
|
||||
|
||||
@@ -84,23 +84,44 @@ func OpenConfig(configDirectory string, configuration *models.Configuration) {
|
||||
collection := db.Collection("configuration")
|
||||
|
||||
var globalConfig models.Config
|
||||
err := collection.FindOne(context.Background(), bson.M{
|
||||
res := collection.FindOne(context.Background(), bson.M{
|
||||
"type": "global",
|
||||
}).Decode(&globalConfig)
|
||||
})
|
||||
|
||||
if res.Err() != nil {
|
||||
log.Log.Error("Could not find global configuration, using default configuration.")
|
||||
panic("Could not find global configuration, using default configuration.")
|
||||
}
|
||||
err := res.Decode(&globalConfig)
|
||||
if err != nil {
|
||||
log.Log.Error("Could not find global configuration, using default configuration.")
|
||||
panic("Could not find global configuration, using default configuration.")
|
||||
}
|
||||
if globalConfig.Type != "global" {
|
||||
log.Log.Error("Could not find global configuration, might missed the mongodb connection.")
|
||||
panic("Could not find global configuration, might missed the mongodb connection.")
|
||||
}
|
||||
|
||||
configuration.GlobalConfig = globalConfig
|
||||
|
||||
var customConfig models.Config
|
||||
deploymentName := os.Getenv("DEPLOYMENT_NAME")
|
||||
err = collection.FindOne(context.Background(), bson.M{
|
||||
res = collection.FindOne(context.Background(), bson.M{
|
||||
"type": "config",
|
||||
"name": deploymentName,
|
||||
}).Decode(&customConfig)
|
||||
})
|
||||
if res.Err() != nil {
|
||||
log.Log.Error("Could not find configuration for " + deploymentName + ", using global configuration.")
|
||||
}
|
||||
err = res.Decode(&customConfig)
|
||||
if err != nil {
|
||||
log.Log.Error("Could not find configuration for " + deploymentName + ", using global configuration.")
|
||||
}
|
||||
|
||||
if customConfig.Type != "config" {
|
||||
log.Log.Error("Could not find custom configuration, might missed the mongodb connection.")
|
||||
panic("Could not find custom configuration, might missed the mongodb connection.")
|
||||
}
|
||||
configuration.CustomConfig = customConfig
|
||||
|
||||
// We will merge both configs in a single config file.
|
||||
|
||||
@@ -165,7 +165,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
}
|
||||
|
||||
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
|
||||
if cameraSettings.Initialized {
|
||||
|
||||
if cameraSettings.RTSP != "" && cameraSettings.SubRTSP != "" && cameraSettings.Initialized {
|
||||
decoder.Close()
|
||||
if subStreamEnabled {
|
||||
subDecoder.Close()
|
||||
|
||||
@@ -41,7 +41,8 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
|
||||
log.Log.Info("ProcessMotion: Motion detection enabled.")
|
||||
|
||||
key := config.HubKey
|
||||
hubKey := config.HubKey
|
||||
deviceKey := config.Key
|
||||
|
||||
// Allocate a VideoFrame
|
||||
frame := ffmpeg.AllocVideoFrame()
|
||||
@@ -167,10 +168,10 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
// If offline mode is disabled, send a message to the hub
|
||||
if config.Offline != "true" {
|
||||
if mqttClient != nil {
|
||||
if key != "" {
|
||||
mqttClient.Publish("kerberos/"+key+"/device/"+config.Key+"/motion", 2, false, "motion")
|
||||
if hubKey != "" {
|
||||
mqttClient.Publish("kerberos/"+hubKey+"/device/"+deviceKey+"/motion", 2, false, "motion")
|
||||
} else {
|
||||
mqttClient.Publish("kerberos/device/"+config.Key+"/motion", 2, false, "motion")
|
||||
mqttClient.Publish("kerberos/device/"+deviceKey+"/motion", 2, false, "motion")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,4 +12,7 @@ type OnvifActionPTZ struct {
|
||||
Down int `json:"down" bson:"down"`
|
||||
Center int `json:"center" bson:"center"`
|
||||
Zoom float64 `json:"zoom" bson:"zoom"`
|
||||
X float64 `json:"x" bson:"x"`
|
||||
Y float64 `json:"y" bson:"y"`
|
||||
Z float64 `json:"z" bson:"z"`
|
||||
}
|
||||
|
||||
@@ -45,14 +45,53 @@ func HandleONVIFActions(configuration *models.Configuration, communication *mode
|
||||
|
||||
if err == nil {
|
||||
|
||||
if onvifAction.Action == "ptz" {
|
||||
if onvifAction.Action == "absolute-move" {
|
||||
|
||||
// We will move the camera to zero position.
|
||||
x := ptzAction.X
|
||||
y := ptzAction.Y
|
||||
z := ptzAction.Z
|
||||
|
||||
// Check which PTZ Space we need to use
|
||||
functions, _, _ := GetPTZFunctionsFromDevice(configurations)
|
||||
|
||||
// Log functions
|
||||
log.Log.Info("HandleONVIFActions: functions: " + strings.Join(functions, ", "))
|
||||
|
||||
// Check if we need to use absolute or continuous move
|
||||
canAbsoluteMove := false
|
||||
canContinuousMove := false
|
||||
|
||||
if len(functions) > 0 {
|
||||
for _, function := range functions {
|
||||
if function == "AbsolutePanTiltMove" || function == "AbsoluteZoomMove" {
|
||||
canAbsoluteMove = true
|
||||
} else if function == "ContinuousPanTiltMove" || function == "ContinuousZoomMove" {
|
||||
canContinuousMove = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if canAbsoluteMove {
|
||||
err = AbsolutePanTiltMove(device, configurations, token, x, y, z)
|
||||
if err != nil {
|
||||
log.Log.Error("HandleONVIFActions (AbsolutePanTitleMove): " + err.Error())
|
||||
}
|
||||
} else if canContinuousMove {
|
||||
err = AbsolutePanTiltMoveFake(device, configurations, token, x, y, z)
|
||||
if err != nil {
|
||||
log.Log.Error("HandleONVIFActions (AbsolutePanTitleMoveFake): " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
} else if onvifAction.Action == "ptz" {
|
||||
|
||||
if err == nil {
|
||||
|
||||
if ptzAction.Center == 1 {
|
||||
|
||||
// We will move the camera to zero position.
|
||||
err := AbsolutePanTiltMove(device, configurations, token, 0, 0)
|
||||
err := AbsolutePanTiltMove(device, configurations, token, 0, 0, 0)
|
||||
if err != nil {
|
||||
log.Log.Error("HandleONVIFActions (AbsolutePanTitleMove): " + err.Error())
|
||||
}
|
||||
@@ -179,18 +218,83 @@ func GetPTZConfigurationsFromDevice(device *onvif.Device) (ptz.GetConfigurations
|
||||
return configurations, err
|
||||
}
|
||||
|
||||
func AbsolutePanTiltMove(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float32, tilt float32) error {
|
||||
func GetPositionFromDevice(configuration models.Configuration) (xsd.PTZVector, error) {
|
||||
var position xsd.PTZVector
|
||||
// Connect to Onvif device
|
||||
cameraConfiguration := configuration.Config.Capture.IPCamera
|
||||
device, err := ConnectToOnvifDevice(&cameraConfiguration)
|
||||
if err == nil {
|
||||
|
||||
absoluteVector := xsd.Vector2D{
|
||||
X: float64(pan),
|
||||
Y: float64(tilt),
|
||||
// Get token from the first profile
|
||||
token, err := GetTokenFromProfile(device, 0)
|
||||
if err == nil {
|
||||
// Get the PTZ configurations from the device
|
||||
position, err := GetPosition(device, token)
|
||||
if err == nil {
|
||||
return position, err
|
||||
} else {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
}
|
||||
} else {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
}
|
||||
} else {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
}
|
||||
}
|
||||
|
||||
func GetPosition(device *onvif.Device, token xsd.ReferenceToken) (xsd.PTZVector, error) {
|
||||
// We'll try to receive the PTZ configurations from the server
|
||||
var status ptz.GetStatusResponse
|
||||
var position xsd.PTZVector
|
||||
|
||||
// Get the PTZ configurations from the device
|
||||
resp, err := device.CallMethod(ptz.GetStatus{
|
||||
ProfileToken: token,
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
defer resp.Body.Close()
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err == nil {
|
||||
stringBody := string(b)
|
||||
decodedXML, et, err := getXMLNode(stringBody, "GetStatusResponse")
|
||||
if err != nil {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
} else {
|
||||
if err := decodedXML.DecodeElement(&status, et); err != nil {
|
||||
log.Log.Error("GetPositionFromDevice: " + err.Error())
|
||||
return position, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
position = status.PTZStatus.Position
|
||||
return position, err
|
||||
}
|
||||
|
||||
func AbsolutePanTiltMove(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float64, tilt float64, zoom float64) error {
|
||||
|
||||
absolutePantiltVector := xsd.Vector2D{
|
||||
X: pan,
|
||||
Y: tilt,
|
||||
Space: configuration.PTZConfiguration.DefaultAbsolutePantTiltPositionSpace,
|
||||
}
|
||||
|
||||
absoluteZoomVector := xsd.Vector1D{
|
||||
X: zoom,
|
||||
Space: configuration.PTZConfiguration.DefaultAbsoluteZoomPositionSpace,
|
||||
}
|
||||
|
||||
res, err := device.CallMethod(ptz.AbsoluteMove{
|
||||
ProfileToken: token,
|
||||
Position: xsd.PTZVector{
|
||||
PanTilt: absoluteVector,
|
||||
PanTilt: absolutePantiltVector,
|
||||
Zoom: absoluteZoomVector,
|
||||
},
|
||||
})
|
||||
|
||||
@@ -204,6 +308,230 @@ func AbsolutePanTiltMove(device *onvif.Device, configuration ptz.GetConfiguratio
|
||||
return err
|
||||
}
|
||||
|
||||
// This function will simulate the AbsolutePanTiltMove function.
|
||||
// However the AboslutePanTiltMove function is not working on all cameras.
|
||||
// So we'll use the ContinuousMove function to simulate the AbsolutePanTiltMove function using the position polling.
|
||||
func AbsolutePanTiltMoveFake(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float64, tilt float64, zoom float64) error {
|
||||
position, err := GetPosition(device, token)
|
||||
if position.PanTilt.X >= pan-0.01 && position.PanTilt.X <= pan+0.01 && position.PanTilt.Y >= tilt-0.01 && position.PanTilt.Y <= tilt+0.01 && position.Zoom.X >= zoom-0.01 && position.Zoom.X <= zoom+0.01 {
|
||||
log.Log.Debug("AbsolutePanTiltMoveFake: already at position")
|
||||
} else {
|
||||
|
||||
// The speed of panning, the higher the faster we'll pan the camera
|
||||
// value is a range between 0 and 1.
|
||||
speed := 0.6
|
||||
wait := 100 * time.Millisecond
|
||||
|
||||
err := ZoomOutCompletely(device, configuration, token)
|
||||
|
||||
// We'll move quickly to the position (might be inaccurate)
|
||||
err = PanUntilPosition(device, configuration, token, pan, speed, wait)
|
||||
err = TiltUntilPosition(device, configuration, token, tilt, speed, wait)
|
||||
|
||||
// Now we'll move a bit slower to make sure we are ok (will be more accurate)
|
||||
speed = 0.2
|
||||
wait = 200 * time.Millisecond
|
||||
|
||||
err = PanUntilPosition(device, configuration, token, pan, speed, wait)
|
||||
err = TiltUntilPosition(device, configuration, token, tilt, speed, wait)
|
||||
err = ZoomUntilPosition(device, configuration, token, zoom, speed, wait)
|
||||
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func ZoomOutCompletely(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken) error {
|
||||
// Zoom out completely!!!
|
||||
zoomOut := xsd.Vector1D{
|
||||
X: -1,
|
||||
Space: configuration.PTZConfiguration.DefaultContinuousZoomVelocitySpace,
|
||||
}
|
||||
_, err := device.CallMethod(ptz.ContinuousMove{
|
||||
ProfileToken: token,
|
||||
Velocity: xsd.PTZSpeedZoom{
|
||||
Zoom: zoomOut,
|
||||
},
|
||||
})
|
||||
for {
|
||||
position, _ := GetPosition(device, token)
|
||||
if position.Zoom.X == 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
|
||||
device.CallMethod(ptz.Stop{
|
||||
ProfileToken: token,
|
||||
Zoom: true,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func PanUntilPosition(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float64, speed float64, wait time.Duration) error {
|
||||
position, err := GetPosition(device, token)
|
||||
|
||||
if position.PanTilt.X >= pan-0.01 && position.PanTilt.X <= pan+0.01 {
|
||||
|
||||
} else {
|
||||
|
||||
// We'll need to determine if we need to move CW or CCW.
|
||||
// Check the current position and compare it with the desired position.
|
||||
directionX := speed
|
||||
if position.PanTilt.X > pan {
|
||||
directionX = speed * -1
|
||||
}
|
||||
|
||||
panTiltVector := xsd.Vector2D{
|
||||
X: directionX,
|
||||
Y: 0,
|
||||
Space: configuration.PTZConfiguration.DefaultContinuousPanTiltVelocitySpace,
|
||||
}
|
||||
res, err := device.CallMethod(ptz.ContinuousMove{
|
||||
ProfileToken: token,
|
||||
Velocity: xsd.PTZSpeedPanTilt{
|
||||
PanTilt: panTiltVector,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Log.Error("ContinuousPanTiltMove (Pan): " + err.Error())
|
||||
}
|
||||
|
||||
bs, _ := ioutil.ReadAll(res.Body)
|
||||
log.Log.Debug("ContinuousPanTiltMove (Pan): " + string(bs))
|
||||
|
||||
// While moving we'll check if we reached the desired position.
|
||||
// or if we overshot the desired position.
|
||||
for {
|
||||
position, _ := GetPosition(device, token)
|
||||
if position.PanTilt.X == -1 || position.PanTilt.X == 1 || (directionX > 0 && position.PanTilt.X >= pan) || (directionX < 0 && position.PanTilt.X <= pan) || (position.PanTilt.X >= pan-0.005 && position.PanTilt.X <= pan+0.005) {
|
||||
break
|
||||
}
|
||||
time.Sleep(wait)
|
||||
}
|
||||
|
||||
_, errStop := device.CallMethod(ptz.Stop{
|
||||
ProfileToken: token,
|
||||
PanTilt: true,
|
||||
})
|
||||
|
||||
if errStop != nil {
|
||||
log.Log.Error("ContinuousPanTiltMove (Pan): " + errStop.Error())
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func TiltUntilPosition(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, tilt float64, speed float64, wait time.Duration) error {
|
||||
position, err := GetPosition(device, token)
|
||||
|
||||
if position.PanTilt.Y >= tilt-0.01 && position.PanTilt.Y <= tilt+0.01 {
|
||||
|
||||
} else {
|
||||
|
||||
// We'll need to determine if we need to move CW or CCW.
|
||||
// Check the current position and compare it with the desired position.
|
||||
directionY := speed
|
||||
if position.PanTilt.Y > tilt {
|
||||
directionY = speed * -1
|
||||
}
|
||||
|
||||
panTiltVector := xsd.Vector2D{
|
||||
X: 0,
|
||||
Y: directionY,
|
||||
Space: configuration.PTZConfiguration.DefaultContinuousPanTiltVelocitySpace,
|
||||
}
|
||||
res, err := device.CallMethod(ptz.ContinuousMove{
|
||||
ProfileToken: token,
|
||||
Velocity: xsd.PTZSpeedPanTilt{
|
||||
PanTilt: panTiltVector,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Log.Error("ContinuousPanTiltMove (Tilt): " + err.Error())
|
||||
}
|
||||
|
||||
bs, _ := ioutil.ReadAll(res.Body)
|
||||
log.Log.Debug("ContinuousPanTiltMove (Tilt) " + string(bs))
|
||||
|
||||
// While moving we'll check if we reached the desired position.
|
||||
// or if we overshot the desired position.
|
||||
for {
|
||||
position, _ := GetPosition(device, token)
|
||||
if position.PanTilt.Y == -1 || position.PanTilt.Y == 1 || (directionY > 0 && position.PanTilt.Y >= tilt) || (directionY < 0 && position.PanTilt.Y <= tilt) || (position.PanTilt.Y >= tilt-0.005 && position.PanTilt.Y <= tilt+0.005) {
|
||||
break
|
||||
}
|
||||
time.Sleep(wait)
|
||||
}
|
||||
|
||||
_, errStop := device.CallMethod(ptz.Stop{
|
||||
ProfileToken: token,
|
||||
PanTilt: true,
|
||||
})
|
||||
|
||||
if errStop != nil {
|
||||
log.Log.Error("ContinuousPanTiltMove (Tilt): " + errStop.Error())
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func ZoomUntilPosition(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, zoom float64, speed float64, wait time.Duration) error {
|
||||
position, err := GetPosition(device, token)
|
||||
|
||||
if position.Zoom.X >= zoom-0.01 && position.Zoom.X <= zoom+0.01 {
|
||||
|
||||
} else {
|
||||
|
||||
// We'll need to determine if we need to move CW or CCW.
|
||||
// Check the current position and compare it with the desired position.
|
||||
directionZ := speed
|
||||
if position.Zoom.X > zoom {
|
||||
directionZ = speed * -1
|
||||
}
|
||||
|
||||
zoomVector := xsd.Vector1D{
|
||||
X: directionZ,
|
||||
Space: configuration.PTZConfiguration.DefaultContinuousZoomVelocitySpace,
|
||||
}
|
||||
res, err := device.CallMethod(ptz.ContinuousMove{
|
||||
ProfileToken: token,
|
||||
Velocity: xsd.PTZSpeedZoom{
|
||||
Zoom: zoomVector,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Log.Error("ContinuousPanTiltMove (Zoom): " + err.Error())
|
||||
}
|
||||
|
||||
bs, _ := ioutil.ReadAll(res.Body)
|
||||
log.Log.Debug("ContinuousPanTiltMove (Zoom) " + string(bs))
|
||||
|
||||
// While moving we'll check if we reached the desired position.
|
||||
// or if we overshot the desired position.
|
||||
for {
|
||||
position, _ := GetPosition(device, token)
|
||||
if position.Zoom.X == -1 || position.Zoom.X == 1 || (directionZ > 0 && position.Zoom.X >= zoom) || (directionZ < 0 && position.Zoom.X <= zoom) || (position.Zoom.X >= zoom-0.005 && position.Zoom.X <= zoom+0.005) {
|
||||
break
|
||||
}
|
||||
time.Sleep(wait)
|
||||
}
|
||||
|
||||
_, errStop := device.CallMethod(ptz.Stop{
|
||||
ProfileToken: token,
|
||||
Zoom: true,
|
||||
})
|
||||
|
||||
if errStop != nil {
|
||||
log.Log.Error("ContinuousPanTiltMove (Zoom): " + errStop.Error())
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func ContinuousPanTilt(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float64, tilt float64) error {
|
||||
|
||||
panTiltVector := xsd.Vector2D{
|
||||
@@ -226,7 +554,7 @@ func ContinuousPanTilt(device *onvif.Device, configuration ptz.GetConfigurations
|
||||
bs, _ := ioutil.ReadAll(res.Body)
|
||||
log.Log.Debug("ContinuousPanTiltMove: " + string(bs))
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
res, errStop := device.CallMethod(ptz.Stop{
|
||||
ProfileToken: token,
|
||||
|
||||
@@ -10,9 +10,29 @@ import (
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
"github.com/kerberos-io/agent/machinery/src/onvif"
|
||||
"github.com/kerberos-io/agent/machinery/src/webrtc"
|
||||
)
|
||||
|
||||
// The message structure which is used to send over
|
||||
// and receive messages from the MQTT broker
|
||||
type Message struct {
|
||||
Mid string `json:"mid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
PublicKey string `json:"public_key"`
|
||||
Fingerprint string `json:"fingerprint"`
|
||||
Payload Payload `json:"payload"`
|
||||
}
|
||||
|
||||
// The payload structure which is used to send over
|
||||
// and receive messages from the MQTT broker
|
||||
type Payload struct {
|
||||
Action string `json:"action"`
|
||||
DeviceId string `json:"device_id"`
|
||||
Value map[string]interface{} `json:"value"`
|
||||
}
|
||||
|
||||
// We'll cache the MQTT settings to know if we need to reinitialize the MQTT client connection.
|
||||
// If we update the configuration but no new MQTT settings are provided, we don't need to restart it.
|
||||
var PREV_MQTTURI string
|
||||
@@ -34,6 +54,50 @@ func HasMQTTClientModified(configuration *models.Configuration) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func PackageMQTTMessage(msg Message) ([]byte, error) {
|
||||
// We'll generate an unique id, and encrypt it using the private key.
|
||||
msg.Mid = "0123456789+1"
|
||||
msg.Timestamp = time.Now().Unix()
|
||||
msg.Encrypted = false
|
||||
msg.PublicKey = ""
|
||||
msg.Fingerprint = ""
|
||||
payload, err := json.Marshal(msg)
|
||||
return payload, err
|
||||
}
|
||||
|
||||
// Configuring MQTT to subscribe for various bi-directional messaging
|
||||
// Listen and reply (a generic method to share and retrieve information)
|
||||
//
|
||||
// !!! NEW METHOD TO COMMUNICATE: only create a single subscription for all communication.
|
||||
// and an additional publish messages back
|
||||
//
|
||||
// - [SUBSCRIPTION] kerberos/agent/{hubkey} (hub -> agent)
|
||||
// - [PUBLISH] kerberos/hub/{hubkey} (agent -> hub)
|
||||
//
|
||||
// !!! LEGACY METHODS BELOW, WE SHOULD LEVERAGE THE ABOVE METHOD!
|
||||
//
|
||||
// [SUBSCRIPTIONS]
|
||||
//
|
||||
// SD Streaming (Base64 JPEGs)
|
||||
// - kerberos/{hubkey}/device/{devicekey}/request-live: use for polling of SD live streaming (as long the user requests stream, we'll send JPEGs over).
|
||||
//
|
||||
// HD Streaming (WebRTC)
|
||||
// - kerberos/register: use for receiving HD live streaming requests.
|
||||
// - candidate/cloud: remote ICE candidates are shared over this line.
|
||||
// - kerberos/webrtc/keepalivehub/{devicekey}: use for polling of HD streaming (as long the user requests stream, we'll send it over).
|
||||
// - kerberos/webrtc/peers/{devicekey}: we'll keep track of the number of peers (we can have more than 1 concurrent listeners).
|
||||
//
|
||||
// ONVIF capabilities
|
||||
// - kerberos/onvif/{devicekey}: endpoint to execute ONVIF commands such as (PTZ, Zoom, IO, etc)
|
||||
//
|
||||
// [PUBlISH]
|
||||
// Next to subscribing to various topics, we'll also publish messages to various topics, find a list of available Publish methods.
|
||||
//
|
||||
// - kerberos/webrtc/packets/{devicekey}: use for forwarding WebRTC (RTP Packets) over MQTT -> Complex firewall.
|
||||
// - kerberos/webrtc/keepalive/{devicekey}: use for keeping alive forwarded WebRTC stream
|
||||
// - {devicekey}/{sessionid}/answer: once a WebRTC request is received through (kerberos/register), we'll draft an answer and send it back to the remote WebRTC client.
|
||||
// - kerberos/{hubkey}/device/{devicekey}/motion: a motion signal
|
||||
|
||||
func ConfigureMQTT(configuration *models.Configuration, communication *models.Communication) mqtt.Client {
|
||||
|
||||
config := configuration.Config
|
||||
@@ -109,6 +173,9 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
|
||||
// We managed to connect to the MQTT broker, hurray!
|
||||
log.Log.Info("ConfigureMQTT: " + mqttClientID + " connected to " + mqttURL)
|
||||
|
||||
// Create a susbcription for listen and reply
|
||||
MQTTListenerHandler(c, hubKey, configuration, communication)
|
||||
|
||||
// Create a subscription to know if send out a livestream or not.
|
||||
MQTTListenerHandleLiveSD(c, hubKey, configuration, communication)
|
||||
|
||||
@@ -140,6 +207,159 @@ func ConfigureMQTT(configuration *models.Configuration, communication *models.Co
|
||||
return nil
|
||||
}
|
||||
|
||||
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) {
|
||||
if hubKey == "" {
|
||||
log.Log.Info("MQTTListenerHandler: no hub key provided, not subscribing to kerberos/hub/{hubkey}")
|
||||
} else {
|
||||
topicOnvif := fmt.Sprintf("kerberos/agent/%s", hubKey)
|
||||
mqttClient.Subscribe(topicOnvif, 1, func(c mqtt.Client, msg mqtt.Message) {
|
||||
|
||||
// Decode the message, we are expecting following format.
|
||||
// {
|
||||
// mid: string, "unique id for the message"
|
||||
// timestamp: int64, "unix timestamp when the message was generated"
|
||||
// encrypted: boolean,
|
||||
// fingerprint: string, "fingerprint of the message to validate authenticity"
|
||||
// payload: Payload, "a json object which might be encrypted"
|
||||
// }
|
||||
|
||||
var message Message
|
||||
json.Unmarshal(msg.Payload(), &message)
|
||||
if message.Mid != "" && message.Timestamp != 0 {
|
||||
// Messages might be encrypted, if so we'll
|
||||
// need to decrypt them.
|
||||
var payload Payload
|
||||
if message.Encrypted {
|
||||
// We'll find out the key we use to decrypt the message.
|
||||
// TODO -> still needs to be implemented.
|
||||
// Use to fingerprint to act accordingly.
|
||||
} else {
|
||||
payload = message.Payload
|
||||
}
|
||||
|
||||
// We will receive all messages from our hub, so we'll need to filter to the relevant device.
|
||||
if payload.DeviceId != configuration.Config.Key {
|
||||
// Not relevant for this device, so we'll ignore it.
|
||||
} else {
|
||||
// We'll find out which message we received, and act accordingly.
|
||||
switch payload.Action {
|
||||
case "record":
|
||||
HandleRecording(mqttClient, hubKey, payload, configuration, communication)
|
||||
case "get-ptz-position":
|
||||
HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication)
|
||||
case "update-ptz-position":
|
||||
HandleUpdatePTZPosition(mqttClient, hubKey, payload, configuration, communication)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// We received a recording request, we'll send it to the motion handler.
|
||||
type RecordPayload struct {
|
||||
Timestamp int64 `json:"timestamp"` // timestamp of the recording request.
|
||||
}
|
||||
|
||||
func HandleRecording(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
|
||||
value := payload.Value
|
||||
|
||||
// Convert map[string]interface{} to RecordPayload
|
||||
jsonData, _ := json.Marshal(value)
|
||||
var recordPayload RecordPayload
|
||||
json.Unmarshal(jsonData, &recordPayload)
|
||||
|
||||
if recordPayload.Timestamp != 0 {
|
||||
motionDataPartial := models.MotionDataPartial{
|
||||
Timestamp: recordPayload.Timestamp,
|
||||
}
|
||||
communication.HandleMotion <- motionDataPartial
|
||||
}
|
||||
}
|
||||
|
||||
// We received a preset position request, we'll request it through onvif and send it back.
|
||||
type PTZPositionPayload struct {
|
||||
Timestamp int64 `json:"timestamp"` // timestamp of the preset request.
|
||||
}
|
||||
|
||||
func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
|
||||
value := payload.Value
|
||||
|
||||
// Convert map[string]interface{} to PTZPositionPayload
|
||||
jsonData, _ := json.Marshal(value)
|
||||
var positionPayload PTZPositionPayload
|
||||
json.Unmarshal(jsonData, &positionPayload)
|
||||
|
||||
if positionPayload.Timestamp != 0 {
|
||||
// Get Position from device
|
||||
pos, err := onvif.GetPositionFromDevice(*configuration)
|
||||
if err != nil {
|
||||
log.Log.Error("HandlePTZPosition: error getting position from device: " + err.Error())
|
||||
} else {
|
||||
// Needs to wrapped!
|
||||
posString := fmt.Sprintf("%f,%f,%f", pos.PanTilt.X, pos.PanTilt.Y, pos.Zoom.X)
|
||||
message := Message{
|
||||
Payload: Payload{
|
||||
Action: "ptz-position",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: map[string]interface{}{
|
||||
"timestamp": positionPayload.Timestamp,
|
||||
"position": posString,
|
||||
},
|
||||
},
|
||||
}
|
||||
payload, err := PackageMQTTMessage(message)
|
||||
if err == nil {
|
||||
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
|
||||
} else {
|
||||
log.Log.Info("HandlePTZPosition: something went wrong while sending position to hub: " + string(payload))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func HandleUpdatePTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) {
|
||||
value := payload.Value
|
||||
|
||||
// Convert map[string]interface{} to PTZPositionPayload
|
||||
jsonData, _ := json.Marshal(value)
|
||||
var onvifAction models.OnvifAction
|
||||
json.Unmarshal(jsonData, &onvifAction)
|
||||
|
||||
if onvifAction.Action != "" {
|
||||
if communication.CameraConnected {
|
||||
communication.HandleONVIF <- onvifAction
|
||||
log.Log.Info("MQTTListenerHandleONVIF: Received an action - " + onvifAction.Action)
|
||||
} else {
|
||||
log.Log.Info("MQTTListenerHandleONVIF: received action, but camera is not connected.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
|
||||
if mqttClient != nil {
|
||||
// Cleanup all subscriptions
|
||||
|
||||
// New methods
|
||||
mqttClient.Unsubscribe("kerberos/agent/" + PREV_HubKey)
|
||||
|
||||
// Legacy methods
|
||||
mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live")
|
||||
mqttClient.Unsubscribe(PREV_AgentKey + "/register")
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("candidate/cloud")
|
||||
mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey)
|
||||
|
||||
mqttClient.Disconnect(1000)
|
||||
mqttClient = nil
|
||||
log.Log.Info("DisconnectMQTT: MQTT client disconnected.")
|
||||
}
|
||||
}
|
||||
|
||||
// #################################################################################################
|
||||
// Below you'll find legacy methods, as of now we'll have a single subscription, which scales better
|
||||
|
||||
func MQTTListenerHandleLiveSD(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) {
|
||||
config := configuration.Config
|
||||
topicRequest := "kerberos/" + hubKey + "/device/" + config.Key + "/request-live"
|
||||
@@ -243,18 +463,3 @@ func MQTTListenerHandleONVIF(mqttClient mqtt.Client, hubKey string, configuratio
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
|
||||
if mqttClient != nil {
|
||||
// Cleanup all subscriptions
|
||||
mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live")
|
||||
mqttClient.Unsubscribe(PREV_AgentKey + "/register")
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey)
|
||||
mqttClient.Unsubscribe("candidate/cloud")
|
||||
mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey)
|
||||
mqttClient.Disconnect(1000)
|
||||
mqttClient = nil
|
||||
log.Log.Info("DisconnectMQTT: MQTT client disconnected.")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,14 +91,14 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
|
||||
config := configuration.Config
|
||||
|
||||
name := config.Key
|
||||
deviceKey := config.Key
|
||||
stunServers := []string{config.STUNURI}
|
||||
turnServers := []string{config.TURNURI}
|
||||
turnServersUsername := config.TURNUsername
|
||||
turnServersCredential := config.TURNPassword
|
||||
|
||||
// Create WebRTC object
|
||||
w := CreateWebRTC(name, stunServers, turnServers, turnServersUsername, turnServersCredential)
|
||||
w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential)
|
||||
sd, err := w.DecodeSessionDescription(handshake.Sdp)
|
||||
|
||||
if err == nil {
|
||||
@@ -187,7 +187,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
candidatesMux.Lock()
|
||||
defer candidatesMux.Unlock()
|
||||
|
||||
topic := fmt.Sprintf("%s/%s/candidate/edge", name, handshake.Cuuid)
|
||||
topic := fmt.Sprintf("%s/%s/candidate/edge", deviceKey, handshake.Cuuid)
|
||||
log.Log.Info("InitializeWebRTCConnection: Send candidate to " + topic)
|
||||
candiInit := candidate.ToJSON()
|
||||
sdpmid := "0"
|
||||
@@ -203,7 +203,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
peerConnections[handshake.Cuuid] = peerConnection
|
||||
|
||||
if err == nil {
|
||||
topic := fmt.Sprintf("%s/%s/answer", name, handshake.Cuuid)
|
||||
topic := fmt.Sprintf("%s/%s/answer", deviceKey, handshake.Cuuid)
|
||||
log.Log.Info("InitializeWebRTCConnection: Send SDP answer to " + topic)
|
||||
mqttClient.Publish(topic, 2, false, []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP))))
|
||||
}
|
||||
|
||||
@@ -729,7 +729,7 @@ class Settings extends React.Component {
|
||||
/>
|
||||
)}
|
||||
{verifyOnvifError && (
|
||||
<InfoBar type="alert" message={`${verifyOnvifErrorMessage}`} />
|
||||
<InfoBar type="alert" message={verifyOnvifErrorMessage} />
|
||||
)}
|
||||
|
||||
{loadingHub && (
|
||||
|
||||
Reference in New Issue
Block a user