mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-03 08:50:08 +00:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
943e81000e | ||
|
|
36b93a34b4 | ||
|
|
b0d2409524 | ||
|
|
be7a231950 | ||
|
|
31a0b9efa4 | ||
|
|
d70a3ed343 | ||
|
|
56cebb6451 | ||
|
|
99f61bc5e8 | ||
|
|
a5d02e3275 | ||
|
|
354ab7db05 | ||
|
|
dc817f8c26 | ||
|
|
a90097731c | ||
|
|
5b3bbbb37e |
@@ -1,2 +1,2 @@
|
||||
FROM kerberos/devcontainer:9da0ee3
|
||||
FROM kerberos/devcontainer:b2bc659
|
||||
LABEL AUTHOR=Kerberos.io
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
{
|
||||
"type": "",
|
||||
"key": "",
|
||||
|
||||
@@ -2,7 +2,8 @@ module github.com/kerberos-io/agent/machinery
|
||||
|
||||
go 1.19
|
||||
|
||||
//replace github.com/kerberos-io/joy4 v1.0.53 => ../../../../github.com/kerberos-io/joy4
|
||||
//replace github.com/kerberos-io/joy4 v1.0.54 => ../../../../github.com/kerberos-io/joy4
|
||||
|
||||
//replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif
|
||||
|
||||
require (
|
||||
@@ -20,7 +21,7 @@ require (
|
||||
github.com/golang-module/carbon/v2 v2.2.3
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kellydunn/golang-geo v0.7.0
|
||||
github.com/kerberos-io/joy4 v1.0.54
|
||||
github.com/kerberos-io/joy4 v1.0.55
|
||||
github.com/kerberos-io/onvif v0.0.5
|
||||
github.com/minio/minio-go/v6 v6.0.57
|
||||
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
|
||||
|
||||
@@ -175,8 +175,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
|
||||
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
|
||||
github.com/kerberos-io/joy4 v1.0.54 h1:Ct4G00sk/iLqm+wLV0gQWDxnKciAnLiTnuxF8hufcsc=
|
||||
github.com/kerberos-io/joy4 v1.0.54/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/joy4 v1.0.55 h1:P5RISBp8kUowgb/bvqLPVKPJL9n9jI/wXBCLs+XFMWg=
|
||||
github.com/kerberos-io/joy4 v1.0.55/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
|
||||
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=
|
||||
|
||||
@@ -25,7 +25,19 @@ func OpenRTSP(url string) (av.DemuxCloser, []av.CodecData, error) {
|
||||
return nil, []av.CodecData{}, err
|
||||
}
|
||||
|
||||
func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
|
||||
func GetVideoStream(streams []av.CodecData) (av.CodecData, error) {
|
||||
var videoStream av.CodecData
|
||||
for _, stream := range streams {
|
||||
if stream.Type().IsAudio() {
|
||||
//astream := stream.(av.AudioCodecData)
|
||||
} else if stream.Type().IsVideo() {
|
||||
videoStream = stream
|
||||
}
|
||||
}
|
||||
return videoStream, nil
|
||||
}
|
||||
|
||||
func GetVideoDecoder(decoder *ffmpeg.VideoDecoder, streams []av.CodecData) {
|
||||
// Load video codec
|
||||
var vstream av.VideoCodecData
|
||||
for _, stream := range streams {
|
||||
@@ -35,8 +47,10 @@ func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
|
||||
vstream = stream.(av.VideoCodecData)
|
||||
}
|
||||
}
|
||||
dec, _ := ffmpeg.NewVideoDecoder(vstream)
|
||||
return dec
|
||||
err := ffmpeg.NewVideoDecoder(decoder, vstream)
|
||||
if err != nil {
|
||||
log.Log.Error("GetVideoDecoder: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeImage(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package components
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -42,27 +40,6 @@ func GetImageFromFilePath() (image.Image, error) {
|
||||
return nil, errors.New("Could not find a snapshot in " + snapshotDirectory)
|
||||
}
|
||||
|
||||
func GetSnapshot() string {
|
||||
var snapshot string
|
||||
files, err := ioutil.ReadDir("./data/snapshots")
|
||||
if err == nil && len(files) > 1 {
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
return files[i].ModTime().Before(files[j].ModTime())
|
||||
})
|
||||
f, _ := os.Open("./data/snapshots/" + files[1].Name())
|
||||
defer f.Close()
|
||||
// Read entire JPG into byte slice.
|
||||
reader := bufio.NewReader(f)
|
||||
content, _ := ioutil.ReadAll(reader)
|
||||
// Encode as base64.
|
||||
snapshot = base64.StdEncoding.EncodeToString(content)
|
||||
|
||||
// Close reader
|
||||
reader = nil
|
||||
}
|
||||
return snapshot
|
||||
}
|
||||
|
||||
// ReadUserConfig Reads the user configuration of the Kerberos Open Source instance.
|
||||
// This will return a models.User struct including the username, password,
|
||||
// selected language, and if the installation was completed or not.
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package components
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/kerberos-io/joy4/cgo/ffmpeg"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/capture"
|
||||
"github.com/kerberos-io/agent/machinery/src/cloud"
|
||||
"github.com/kerberos-io/agent/machinery/src/computervision"
|
||||
@@ -15,7 +18,6 @@ import (
|
||||
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
|
||||
"github.com/kerberos-io/joy4/av"
|
||||
"github.com/kerberos-io/joy4/av/pubsub"
|
||||
"github.com/kerberos-io/joy4/cgo/ffmpeg"
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
@@ -57,12 +59,17 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
|
||||
// do several checks to see if the agent is still operational.
|
||||
go ControlAgent(communication)
|
||||
|
||||
// Create some global variables
|
||||
decoder := &ffmpeg.VideoDecoder{}
|
||||
subDecoder := &ffmpeg.VideoDecoder{}
|
||||
cameraSettings := &models.Camera{}
|
||||
|
||||
// Run the agent and fire up all the other
|
||||
// goroutines which do image capture, motion detection, onvif, etc.
|
||||
|
||||
for {
|
||||
// This will blocking until receiving a signal to be restarted, reconfigured, stopped, etc.
|
||||
status := RunAgent(configuration, communication, uptimeStart)
|
||||
status := RunAgent(configuration, communication, uptimeStart, cameraSettings, decoder, subDecoder)
|
||||
if status == "stop" {
|
||||
break
|
||||
}
|
||||
@@ -72,7 +79,7 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
|
||||
log.Log.Debug("Bootstrap: finished")
|
||||
}
|
||||
|
||||
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time) string {
|
||||
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time, cameraSettings *models.Camera, decoder *ffmpeg.VideoDecoder, subDecoder *ffmpeg.VideoDecoder) string {
|
||||
log.Log.Debug("RunAgent: started")
|
||||
|
||||
config := configuration.Config
|
||||
@@ -108,13 +115,42 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
}
|
||||
}
|
||||
|
||||
// At some routines we will need to decode the image.
|
||||
// Make sure its properly locked as we only have a single decoder.
|
||||
decoder := capture.GetVideoDecoder(streams)
|
||||
// We will initialise the camera settings object
|
||||
// so we can check if the camera settings have changed, and we need
|
||||
// to reload the decoders.
|
||||
videoStream, _ := capture.GetVideoStream(streams)
|
||||
num, denum := videoStream.(av.VideoCodecData).Framerate()
|
||||
width := videoStream.(av.VideoCodecData).Width()
|
||||
height := videoStream.(av.VideoCodecData).Height()
|
||||
|
||||
var subDecoder *ffmpeg.VideoDecoder
|
||||
if subStreamEnabled {
|
||||
subDecoder = capture.GetVideoDecoder(subStreams)
|
||||
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
|
||||
|
||||
if cameraSettings.Initialized {
|
||||
decoder.Close()
|
||||
if subStreamEnabled {
|
||||
subDecoder.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// At some routines we will need to decode the image.
|
||||
// Make sure its properly locked as we only have a single decoder.
|
||||
log.Log.Info("RunAgent: camera settings changed, reloading decoder")
|
||||
capture.GetVideoDecoder(decoder, streams)
|
||||
if subStreamEnabled {
|
||||
capture.GetVideoDecoder(subDecoder, subStreams)
|
||||
}
|
||||
|
||||
cameraSettings.RTSP = rtspUrl
|
||||
cameraSettings.SubRTSP = subRtspUrl
|
||||
cameraSettings.Width = width
|
||||
cameraSettings.Height = height
|
||||
cameraSettings.Framerate = float64(num) / float64(denum)
|
||||
cameraSettings.Num = num
|
||||
cameraSettings.Denum = denum
|
||||
cameraSettings.Codec = videoStream.(av.VideoCodecData).Type()
|
||||
cameraSettings.Initialized = true
|
||||
} else {
|
||||
log.Log.Info("RunAgent: camera settings did not change, keeping decoder")
|
||||
}
|
||||
|
||||
communication.Decoder = decoder
|
||||
@@ -239,14 +275,7 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
|
||||
// Wait a few seconds to stop the decoder.
|
||||
time.Sleep(time.Second * 3)
|
||||
decoder.Close()
|
||||
decoder = nil
|
||||
communication.Decoder = nil
|
||||
if subStreamEnabled {
|
||||
subDecoder.Close()
|
||||
subDecoder = nil
|
||||
communication.SubDecoder = nil
|
||||
}
|
||||
|
||||
// Waiting for some seconds to make sure everything is properly closed.
|
||||
log.Log.Info("RunAgent: waiting 3 seconds to make sure everything is properly closed.")
|
||||
time.Sleep(time.Second * 3)
|
||||
@@ -257,6 +286,9 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
|
||||
log.Log.Debug("RunAgent: finished")
|
||||
|
||||
// Clean up, force garbage collection
|
||||
runtime.GC()
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
|
||||
@@ -3,12 +3,9 @@ package computervision
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"image"
|
||||
"image/jpeg"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -130,7 +127,7 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
|
||||
// Store snapshots (jpg) for hull.
|
||||
if config.Capture.Snapshots != "false" {
|
||||
StoreSnapshot(frame, pkt, decoder, decoderMutex)
|
||||
StoreSnapshot(communication, frame, pkt, decoder, decoderMutex)
|
||||
}
|
||||
|
||||
// Check if within time interval
|
||||
@@ -237,25 +234,15 @@ func AbsDiffBitwiseAndThreshold(img1 *image.Gray, img2 *image.Gray, img3 *image.
|
||||
return changes
|
||||
}
|
||||
|
||||
func StoreSnapshot(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
|
||||
files, err := ioutil.ReadDir("./data/snapshots")
|
||||
func StoreSnapshot(communication *models.Communication, frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
|
||||
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
|
||||
if err == nil {
|
||||
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
|
||||
buffer := new(bytes.Buffer)
|
||||
w := bufio.NewWriter(buffer)
|
||||
err := jpeg.Encode(w, &rgbImage.Image, &jpeg.Options{Quality: 15})
|
||||
if err == nil {
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
return files[i].ModTime().Before(files[j].ModTime())
|
||||
})
|
||||
if len(files) > 3 {
|
||||
os.Remove("./data/snapshots/" + files[0].Name())
|
||||
}
|
||||
|
||||
// Save image
|
||||
t := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
f, err := os.Create("./data/snapshots/" + t + ".jpg")
|
||||
if err == nil {
|
||||
jpeg.Encode(f, &rgbImage.Image, &jpeg.Options{Quality: 15})
|
||||
f.Close()
|
||||
}
|
||||
snapshot := base64.StdEncoding.EncodeToString(buffer.Bytes())
|
||||
communication.Image = snapshot
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
15
machinery/src/models/Camera.go
Normal file
15
machinery/src/models/Camera.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package models
|
||||
|
||||
import "github.com/kerberos-io/joy4/av"
|
||||
|
||||
type Camera struct {
|
||||
Width int
|
||||
Height int
|
||||
Num int
|
||||
Denum int
|
||||
Framerate float64
|
||||
RTSP string
|
||||
SubRTSP string
|
||||
Codec av.CodecType
|
||||
Initialized bool
|
||||
}
|
||||
@@ -33,4 +33,5 @@ type Communication struct {
|
||||
SubDecoderMutex *sync.Mutex
|
||||
Decoder *ffmpeg.VideoDecoder
|
||||
SubDecoder *ffmpeg.VideoDecoder
|
||||
Image string
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configuratio
|
||||
"config": configuration.Config,
|
||||
"custom": configuration.CustomConfig,
|
||||
"global": configuration.GlobalConfig,
|
||||
"snapshot": components.GetSnapshot(),
|
||||
"snapshot": communication.Image,
|
||||
})
|
||||
})
|
||||
|
||||
@@ -157,7 +157,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configuratio
|
||||
"config": configuration.Config,
|
||||
"custom": configuration.CustomConfig,
|
||||
"global": configuration.GlobalConfig,
|
||||
"snapshot": components.GetSnapshot(),
|
||||
"snapshot": communication.Image,
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@@ -136,7 +136,7 @@ func ForwardSDStream(ctx context.Context, clientID string, connection *Connectio
|
||||
logreader:
|
||||
for {
|
||||
var encodedImage string
|
||||
if cursor != nil && decoder != nil {
|
||||
if queue != nil && cursor != nil && decoder != nil {
|
||||
pkt, err := cursor.ReadPacket()
|
||||
if err == nil {
|
||||
if !pkt.IsKeyFrame {
|
||||
|
||||
Reference in New Issue
Block a user