Compare commits

...

13 Commits

Author SHA1 Message Date
Cédric Verstraeten
943e81000e Update Dockerfile 2023-03-07 20:15:00 +01:00
Cedric Verstraeten
36b93a34b4 check if QUEUE is not null ;) 2023-02-23 15:16:41 +01:00
Cedric Verstraeten
b0d2409524 stop the motion and livestreaming threads first 2023-02-23 14:58:48 +01:00
Cedric Verstraeten
be7a231950 reset configuration 2023-02-23 12:20:25 +01:00
Cedric Verstraeten
31a0b9efa4 enable the rest! 2023-02-21 22:54:33 +01:00
Cedric Verstraeten
d70a3ed343 do not reload decoder if same settings 2023-02-21 22:44:56 +01:00
Cedric Verstraeten
56cebb6451 manage decoder from higher level 2023-02-21 22:12:08 +01:00
Cedric Verstraeten
99f61bc5e8 enable decoder again ;) 2023-02-21 21:45:54 +01:00
Cedric Verstraeten
a5d02e3275 disable decoder, see what it's doing from mem consumption 2023-02-21 21:31:17 +01:00
Cedric Verstraeten
354ab7db05 add garbage collection 2023-02-21 20:49:44 +01:00
Cedric Verstraeten
dc817f8c26 keep snapshot in memory (no longer store on disk) 2023-02-21 12:54:28 +01:00
Cedric Verstraeten
a90097731c fix error 2023-02-21 12:13:29 +01:00
Cedric Verstraeten
5b3bbbb37e new approach to store snapshots! 2023-02-21 12:12:30 +01:00
12 changed files with 101 additions and 73 deletions

View File

@@ -1,2 +1,2 @@
FROM kerberos/devcontainer:9da0ee3
FROM kerberos/devcontainer:b2bc659
LABEL AUTHOR=Kerberos.io

View File

@@ -1,3 +1,4 @@
{
"type": "",
"key": "",

View File

@@ -2,7 +2,8 @@ module github.com/kerberos-io/agent/machinery
go 1.19
//replace github.com/kerberos-io/joy4 v1.0.53 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/joy4 v1.0.54 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif
require (
@@ -20,7 +21,7 @@ require (
github.com/golang-module/carbon/v2 v2.2.3
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.54
github.com/kerberos-io/joy4 v1.0.55
github.com/kerberos-io/onvif v0.0.5
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e

View File

@@ -175,8 +175,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.54 h1:Ct4G00sk/iLqm+wLV0gQWDxnKciAnLiTnuxF8hufcsc=
github.com/kerberos-io/joy4 v1.0.54/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/joy4 v1.0.55 h1:P5RISBp8kUowgb/bvqLPVKPJL9n9jI/wXBCLs+XFMWg=
github.com/kerberos-io/joy4 v1.0.55/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=

View File

@@ -25,7 +25,19 @@ func OpenRTSP(url string) (av.DemuxCloser, []av.CodecData, error) {
return nil, []av.CodecData{}, err
}
func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
func GetVideoStream(streams []av.CodecData) (av.CodecData, error) {
var videoStream av.CodecData
for _, stream := range streams {
if stream.Type().IsAudio() {
//astream := stream.(av.AudioCodecData)
} else if stream.Type().IsVideo() {
videoStream = stream
}
}
return videoStream, nil
}
func GetVideoDecoder(decoder *ffmpeg.VideoDecoder, streams []av.CodecData) {
// Load video codec
var vstream av.VideoCodecData
for _, stream := range streams {
@@ -35,8 +47,10 @@ func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
vstream = stream.(av.VideoCodecData)
}
}
dec, _ := ffmpeg.NewVideoDecoder(vstream)
return dec
err := ffmpeg.NewVideoDecoder(decoder, vstream)
if err != nil {
log.Log.Error("GetVideoDecoder: " + err.Error())
}
}
func DecodeImage(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {

View File

@@ -1,8 +1,6 @@
package components
import (
"bufio"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
@@ -42,27 +40,6 @@ func GetImageFromFilePath() (image.Image, error) {
return nil, errors.New("Could not find a snapshot in " + snapshotDirectory)
}
func GetSnapshot() string {
var snapshot string
files, err := ioutil.ReadDir("./data/snapshots")
if err == nil && len(files) > 1 {
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
f, _ := os.Open("./data/snapshots/" + files[1].Name())
defer f.Close()
// Read entire JPG into byte slice.
reader := bufio.NewReader(f)
content, _ := ioutil.ReadAll(reader)
// Encode as base64.
snapshot = base64.StdEncoding.EncodeToString(content)
// Close reader
reader = nil
}
return snapshot
}
// ReadUserConfig Reads the user configuration of the Kerberos Open Source instance.
// This will return a models.User struct including the username, password,
// selected language, and if the installation was completed or not.

View File

@@ -1,11 +1,14 @@
package components
import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/kerberos-io/joy4/cgo/ffmpeg"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/cloud"
"github.com/kerberos-io/agent/machinery/src/computervision"
@@ -15,7 +18,6 @@ import (
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
"github.com/kerberos-io/joy4/av"
"github.com/kerberos-io/joy4/av/pubsub"
"github.com/kerberos-io/joy4/cgo/ffmpeg"
"github.com/tevino/abool"
)
@@ -57,12 +59,17 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
// do several checks to see if the agent is still operational.
go ControlAgent(communication)
// Create some global variables
decoder := &ffmpeg.VideoDecoder{}
subDecoder := &ffmpeg.VideoDecoder{}
cameraSettings := &models.Camera{}
// Run the agent and fire up all the other
// goroutines which do image capture, motion detection, onvif, etc.
for {
// This will blocking until receiving a signal to be restarted, reconfigured, stopped, etc.
status := RunAgent(configuration, communication, uptimeStart)
status := RunAgent(configuration, communication, uptimeStart, cameraSettings, decoder, subDecoder)
if status == "stop" {
break
}
@@ -72,7 +79,7 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
log.Log.Debug("Bootstrap: finished")
}
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time) string {
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time, cameraSettings *models.Camera, decoder *ffmpeg.VideoDecoder, subDecoder *ffmpeg.VideoDecoder) string {
log.Log.Debug("RunAgent: started")
config := configuration.Config
@@ -108,13 +115,42 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
}
}
// At some routines we will need to decode the image.
// Make sure its properly locked as we only have a single decoder.
decoder := capture.GetVideoDecoder(streams)
// We will initialise the camera settings object
// so we can check if the camera settings have changed, and we need
// to reload the decoders.
videoStream, _ := capture.GetVideoStream(streams)
num, denum := videoStream.(av.VideoCodecData).Framerate()
width := videoStream.(av.VideoCodecData).Width()
height := videoStream.(av.VideoCodecData).Height()
var subDecoder *ffmpeg.VideoDecoder
if subStreamEnabled {
subDecoder = capture.GetVideoDecoder(subStreams)
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
if cameraSettings.Initialized {
decoder.Close()
if subStreamEnabled {
subDecoder.Close()
}
}
// At some routines we will need to decode the image.
// Make sure its properly locked as we only have a single decoder.
log.Log.Info("RunAgent: camera settings changed, reloading decoder")
capture.GetVideoDecoder(decoder, streams)
if subStreamEnabled {
capture.GetVideoDecoder(subDecoder, subStreams)
}
cameraSettings.RTSP = rtspUrl
cameraSettings.SubRTSP = subRtspUrl
cameraSettings.Width = width
cameraSettings.Height = height
cameraSettings.Framerate = float64(num) / float64(denum)
cameraSettings.Num = num
cameraSettings.Denum = denum
cameraSettings.Codec = videoStream.(av.VideoCodecData).Type()
cameraSettings.Initialized = true
} else {
log.Log.Info("RunAgent: camera settings did not change, keeping decoder")
}
communication.Decoder = decoder
@@ -239,14 +275,7 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
// Wait a few seconds to stop the decoder.
time.Sleep(time.Second * 3)
decoder.Close()
decoder = nil
communication.Decoder = nil
if subStreamEnabled {
subDecoder.Close()
subDecoder = nil
communication.SubDecoder = nil
}
// Waiting for some seconds to make sure everything is properly closed.
log.Log.Info("RunAgent: waiting 3 seconds to make sure everything is properly closed.")
time.Sleep(time.Second * 3)
@@ -257,6 +286,9 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
log.Log.Debug("RunAgent: finished")
// Clean up, force garbage collection
runtime.GC()
return status
}

View File

@@ -3,12 +3,9 @@ package computervision
import (
"bufio"
"bytes"
"encoding/base64"
"image"
"image/jpeg"
"io/ioutil"
"os"
"sort"
"strconv"
"sync"
"time"
@@ -130,7 +127,7 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
// Store snapshots (jpg) for hull.
if config.Capture.Snapshots != "false" {
StoreSnapshot(frame, pkt, decoder, decoderMutex)
StoreSnapshot(communication, frame, pkt, decoder, decoderMutex)
}
// Check if within time interval
@@ -237,25 +234,15 @@ func AbsDiffBitwiseAndThreshold(img1 *image.Gray, img2 *image.Gray, img3 *image.
return changes
}
func StoreSnapshot(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
files, err := ioutil.ReadDir("./data/snapshots")
func StoreSnapshot(communication *models.Communication, frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
buffer := new(bytes.Buffer)
w := bufio.NewWriter(buffer)
err := jpeg.Encode(w, &rgbImage.Image, &jpeg.Options{Quality: 15})
if err == nil {
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
if len(files) > 3 {
os.Remove("./data/snapshots/" + files[0].Name())
}
// Save image
t := strconv.FormatInt(time.Now().Unix(), 10)
f, err := os.Create("./data/snapshots/" + t + ".jpg")
if err == nil {
jpeg.Encode(f, &rgbImage.Image, &jpeg.Options{Quality: 15})
f.Close()
}
snapshot := base64.StdEncoding.EncodeToString(buffer.Bytes())
communication.Image = snapshot
}
}
}

View File

@@ -0,0 +1,15 @@
package models
import "github.com/kerberos-io/joy4/av"
type Camera struct {
Width int
Height int
Num int
Denum int
Framerate float64
RTSP string
SubRTSP string
Codec av.CodecType
Initialized bool
}

View File

@@ -33,4 +33,5 @@ type Communication struct {
SubDecoderMutex *sync.Mutex
Decoder *ffmpeg.VideoDecoder
SubDecoder *ffmpeg.VideoDecoder
Image string
}

View File

@@ -29,7 +29,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configuratio
"config": configuration.Config,
"custom": configuration.CustomConfig,
"global": configuration.GlobalConfig,
"snapshot": components.GetSnapshot(),
"snapshot": communication.Image,
})
})
@@ -157,7 +157,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configuratio
"config": configuration.Config,
"custom": configuration.CustomConfig,
"global": configuration.GlobalConfig,
"snapshot": components.GetSnapshot(),
"snapshot": communication.Image,
})
})

View File

@@ -136,7 +136,7 @@ func ForwardSDStream(ctx context.Context, clientID string, connection *Connectio
logreader:
for {
var encodedImage string
if cursor != nil && decoder != nil {
if queue != nil && cursor != nil && decoder != nil {
pkt, err := cursor.ReadPacket()
if err == nil {
if !pkt.IsKeyFrame {