Compare commits

...

5 Commits

Author SHA1 Message Date
Cedric Verstraeten
229c246e1c adding ctx (support) + unblock when unsupported codec 2023-06-08 21:50:10 +02:00
Cedric Verstraeten
15d9bcda4f decoding issue caused new mongodb adapter to fail 2023-06-07 22:01:44 +02:00
Cedric Verstraeten
068063695e time table might be empty 2023-06-07 20:24:09 +02:00
Cedric Verstraeten
b1722844f3 fix config overriding 2023-06-07 20:19:50 +02:00
Cedric Verstraeten
eb5ab48d6c timetable might be empty 2023-06-07 18:47:48 +02:00
11 changed files with 107 additions and 55 deletions

View File

@@ -2,6 +2,9 @@ module github.com/kerberos-io/agent/machinery
go 1.19
// replace github.com/kerberos-io/joy4 v1.0.57 => ../../../../github.com/kerberos-io/joy4
// replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif
require (
github.com/InVisionApp/conjungo v1.1.0
github.com/appleboy/gin-jwt/v2 v2.9.1
@@ -21,7 +24,7 @@ require (
github.com/golang-module/carbon/v2 v2.2.3
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.57
github.com/kerberos-io/joy4 v1.0.58
github.com/kerberos-io/onvif v0.0.5
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e

View File

@@ -264,8 +264,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.57 h1:/8epNAJv4cOzBG8pFiM9hVNXfwsgA+8/2nHQ2yOeyII=
github.com/kerberos-io/joy4 v1.0.57/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/joy4 v1.0.58 h1:R8EECSF+bG7o2yHC6cX/lF77Z+bDVGl6OioLZ3+5MN4=
github.com/kerberos-io/joy4 v1.0.58/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"os"
"time"
@@ -16,7 +17,6 @@ import (
var VERSION = "3.0.0"
func main() {
// You might be interested in debugging the agent.
if os.Getenv("DATADOG_AGENT_ENABLED") == "true" {
if os.Getenv("DATADOG_AGENT_K8S_ENABLED") == "true" {
@@ -111,8 +111,14 @@ func main() {
}
}
// Create a cancelable context, which will be used to cancel and restart.
// This is used to restart the agent when the configuration is updated.
ctx, cancel := context.WithCancel(context.Background())
// Bootstrapping the agent
communication := models.Communication{
Context: &ctx,
CancelContext: &cancel,
HandleBootstrap: make(chan string, 1),
}
go components.Bootstrap(&configuration, &communication)

View File

@@ -1,6 +1,7 @@
package capture
import (
"context"
"strconv"
"sync"
"time"
@@ -15,9 +16,9 @@ import (
"github.com/kerberos-io/joy4/format"
)
func OpenRTSP(url string) (av.DemuxCloser, []av.CodecData, error) {
func OpenRTSP(ctx context.Context, url string) (av.DemuxCloser, []av.CodecData, error) {
format.RegisterAll()
infile, err := avutil.Open(url)
infile, err := avutil.Open(ctx, url)
if err == nil {
streams, errstreams := infile.Streams()
return infile, streams, errstreams

View File

@@ -2,6 +2,7 @@
package capture
import (
"context"
"os"
"strconv"
"time"
@@ -431,6 +432,10 @@ func VerifyCamera(c *gin.Context) {
var cameraStreams models.CameraStreams
err := c.BindJSON(&cameraStreams)
// Should return in 5 seconds.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err == nil {
streamType := c.Param("streamType")
@@ -442,7 +447,7 @@ func VerifyCamera(c *gin.Context) {
if streamType == "secondary" {
rtspUrl = cameraStreams.SubRTSP
}
_, codecs, err := OpenRTSP(rtspUrl)
_, codecs, err := OpenRTSP(ctx, rtspUrl)
if err == nil {
videoIdx := -1

View File

@@ -83,17 +83,25 @@ func OpenConfig(configuration *models.Configuration) {
db := client.Database(database.DatabaseName)
collection := db.Collection("configuration")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
collection.FindOne(ctx, bson.M{
var globalConfig models.Config
err := collection.FindOne(context.Background(), bson.M{
"type": "global",
}).Decode(&configuration.GlobalConfig)
}).Decode(&globalConfig)
if err != nil {
log.Log.Error("Could not find global configuration, using default configuration.")
}
configuration.GlobalConfig = globalConfig
collection.FindOne(ctx, bson.M{
var customConfig models.Config
deploymentName := os.Getenv("DEPLOYMENT_NAME")
err = collection.FindOne(context.Background(), bson.M{
"type": "config",
"name": os.Getenv("DEPLOYMENT_NAME"),
}).Decode(&configuration.CustomConfig)
"name": deploymentName,
}).Decode(&customConfig)
if err != nil {
log.Log.Error("Could not find configuration for " + deploymentName + ", using global configuration.")
}
configuration.CustomConfig = customConfig
// We will merge both configs in a single config file.
// Read again from database but this store overwrite the same object.
@@ -209,8 +217,7 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
/* ONVIF connnection settings */
case "AGENT_CAPTURE_IPCAMERA_ONVIF":
isEnabled := value == " true"
configuration.Config.Capture.IPCamera.ONVIF = isEnabled
configuration.Config.Capture.IPCamera.ONVIF = value
break
case "AGENT_CAPTURE_IPCAMERA_ONVIF_XADDR":
configuration.Config.Capture.IPCamera.ONVIFXAddr = value

View File

@@ -1,6 +1,7 @@
package components
import (
"context"
"runtime"
"strconv"
"sync"
@@ -73,14 +74,22 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
for {
// This will blocking until receiving a signal to be restarted, reconfigured, stopped, etc.
status := RunAgent(configuration, communication, uptimeStart, cameraSettings, decoder, subDecoder)
if status == "stop" {
break
}
// We will re open the configuration, might have changed :O!
OpenConfig(configuration)
// We will override the configuration with the environment variables
OverrideWithEnvironmentVariables(configuration)
// We will create a new cancelable context, which will be used to cancel and restart.
// This is used to restart the agent when the configuration is updated.
ctx, cancel := context.WithCancel(context.Background())
communication.Context = &ctx
communication.CancelContext = &cancel
}
log.Log.Debug("Bootstrap: finished")
}
@@ -90,10 +99,27 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
log.Log.Debug("RunAgent: bootstrapping agent")
config := configuration.Config
status := "not started"
// Currently only support H264 encoded cameras, this will change.
// Establishing the camera connection
rtspUrl := config.Capture.IPCamera.RTSP
infile, streams, err := capture.OpenRTSP(rtspUrl)
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl)
// We will initialise the camera settings object
// so we can check if the camera settings have changed, and we need
// to reload the decoders.
videoStream, _ := capture.GetVideoStream(streams)
if videoStream == nil {
log.Log.Error("RunAgent: no video stream found, might be the wrong codec (we only support H264 for the moment)")
time.Sleep(time.Second * 3)
return status
}
num, denum := videoStream.(av.VideoCodecData).Framerate()
width := videoStream.(av.VideoCodecData).Width()
height := videoStream.(av.VideoCodecData).Height()
var queue *pubsub.Queue
var subQueue *pubsub.Queue
@@ -101,8 +127,6 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
var decoderMutex sync.Mutex
var subDecoderMutex sync.Mutex
status := "not started"
if err == nil {
log.Log.Info("RunAgent: opened RTSP stream" + rtspUrl)
@@ -113,23 +137,21 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
subStreamEnabled := false
subRtspUrl := config.Capture.IPCamera.SubRTSP
if subRtspUrl != "" && subRtspUrl != rtspUrl {
subInfile, subStreams, err = capture.OpenRTSP(subRtspUrl)
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl)
if err == nil {
log.Log.Info("RunAgent: opened RTSP sub stream " + subRtspUrl)
subStreamEnabled = true
}
videoStream, _ := capture.GetVideoStream(subStreams)
if videoStream == nil {
log.Log.Error("RunAgent: no video substream found, might be the wrong codec (we only support H264 for the moment)")
time.Sleep(time.Second * 3)
return status
}
}
// We will initialise the camera settings object
// so we can check if the camera settings have changed, and we need
// to reload the decoders.
videoStream, _ := capture.GetVideoStream(streams)
num, denum := videoStream.(av.VideoCodecData).Framerate()
width := videoStream.(av.VideoCodecData).Width()
height := videoStream.(av.VideoCodecData).Height()
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
if cameraSettings.Initialized {
decoder.Close()
if subStreamEnabled {
@@ -240,8 +262,12 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// This will go into a blocking state, once this channel is triggered
// the agent will cleanup and restart.
status = <-communication.HandleBootstrap
// Cancel the main context, this will stop all the other goroutines.
(*communication.CancelContext)()
// Here we are cleaning up everything!
if configuration.Config.Offline != "true" {
communication.HandleUpload <- "stop"

View File

@@ -10,15 +10,12 @@ import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
geo "github.com/kellydunn/golang-geo"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/joy4/av/pubsub"
//"github.com/whorfin/go-libjpeg/jpeg"
geo "github.com/kellydunn/golang-geo"
"github.com/kerberos-io/joy4/av"
"github.com/kerberos-io/joy4/av/pubsub"
"github.com/kerberos-io/joy4/cgo/ffmpeg"
)
@@ -142,19 +139,21 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
hour := now.Hour()
minute := now.Minute()
second := now.Second()
timeInterval := config.Timetable[int(weekday)]
if timeInterval != nil {
start1 := timeInterval.Start1
end1 := timeInterval.End1
start2 := timeInterval.Start2
end2 := timeInterval.End2
currentTimeInSeconds := hour*60*60 + minute*60 + second
if (currentTimeInSeconds >= start1 && currentTimeInSeconds <= end1) ||
(currentTimeInSeconds >= start2 && currentTimeInSeconds <= end2) {
if config.Timetable != nil {
timeInterval := config.Timetable[int(weekday)]
if timeInterval != nil {
start1 := timeInterval.Start1
end1 := timeInterval.End1
start2 := timeInterval.Start2
end2 := timeInterval.End2
currentTimeInSeconds := hour*60*60 + minute*60 + second
if (currentTimeInSeconds >= start1 && currentTimeInSeconds <= end1) ||
(currentTimeInSeconds >= start2 && currentTimeInSeconds <= end2) {
} else {
detectMotion = false
log.Log.Info("ProcessMotion: Time interval not valid, disabling motion detection.")
} else {
detectMotion = false
log.Log.Info("ProcessMotion: Time interval not valid, disabling motion detection.")
}
}
}
}

View File

@@ -1,6 +1,7 @@
package models
import (
"context"
"sync"
"sync/atomic"
@@ -12,6 +13,8 @@ import (
// The communication struct that is managing
// all the communication between the different goroutines.
type Communication struct {
Context *context.Context
CancelContext *context.CancelFunc
PackageCounter *atomic.Value
LastPacketTimer *atomic.Value
CloudTimestamp *atomic.Value

View File

@@ -73,7 +73,7 @@ type IPCamera struct {
RTSP string `json:"rtsp"`
SubRTSP string `json:"sub_rtsp"`
FPS string `json:"fps"`
ONVIF bool `json:"onvif,omitempty" bson:"onvif"`
ONVIF string `json:"onvif,omitempty" bson:"onvif"`
ONVIFXAddr string `json:"onvif_xaddr,omitempty" bson:"onvif_xaddr"`
ONVIFUsername string `json:"onvif_username,omitempty" bson:"onvif_username"`
ONVIFPassword string `json:"onvif_password,omitempty" bson:"onvif_password"`

View File

@@ -224,13 +224,15 @@ class Settings extends React.Component {
calculateTimetable(timetable) {
this.timetable = timetable;
for (let i = 0; i < timetable.length; i += 1) {
const time = timetable[i];
const { start1, start2, end1, end2 } = time;
this.timetable[i].start1Full = this.convertSecondsToHourMinute(start1);
this.timetable[i].start2Full = this.convertSecondsToHourMinute(start2);
this.timetable[i].end1Full = this.convertSecondsToHourMinute(end1);
this.timetable[i].end2Full = this.convertSecondsToHourMinute(end2);
if (this.timetable) {
for (let i = 0; i < timetable.length; i += 1) {
const time = timetable[i];
const { start1, start2, end1, end2 } = time;
this.timetable[i].start1Full = this.convertSecondsToHourMinute(start1);
this.timetable[i].start2Full = this.convertSecondsToHourMinute(start2);
this.timetable[i].end1Full = this.convertSecondsToHourMinute(end1);
this.timetable[i].end2Full = this.convertSecondsToHourMinute(end2);
}
}
}