Compare commits

...

24 Commits

Author SHA1 Message Date
Cedric Verstraeten
b0d2409524 stop the motion and livestreaming threads first 2023-02-23 14:58:48 +01:00
Cedric Verstraeten
be7a231950 reset configuration 2023-02-23 12:20:25 +01:00
Cedric Verstraeten
31a0b9efa4 enable the rest! 2023-02-21 22:54:33 +01:00
Cedric Verstraeten
d70a3ed343 do not reload decoder if same settings 2023-02-21 22:44:56 +01:00
Cedric Verstraeten
56cebb6451 manage decoder from higher level 2023-02-21 22:12:08 +01:00
Cedric Verstraeten
99f61bc5e8 enable decoder again ;) 2023-02-21 21:45:54 +01:00
Cedric Verstraeten
a5d02e3275 disable decoder, see what it's doing from mem consumption 2023-02-21 21:31:17 +01:00
Cedric Verstraeten
354ab7db05 add garbage collection 2023-02-21 20:49:44 +01:00
Cedric Verstraeten
dc817f8c26 keep snapshot in memory (no longer store on disk) 2023-02-21 12:54:28 +01:00
Cedric Verstraeten
a90097731c fix error 2023-02-21 12:13:29 +01:00
Cedric Verstraeten
5b3bbbb37e new approach to store snapshots! 2023-02-21 12:12:30 +01:00
Cedric Verstraeten
4a4aabd71c upgrade to joy v1.0.54 2023-02-19 22:04:39 +01:00
Cedric Verstraeten
b058c1e742 set pointers to nil 2023-02-18 22:14:33 +01:00
Cedric Verstraeten
7671b1c2c3 unsubscribe from mqtt subscriptions 2023-02-18 22:12:44 +01:00
Cedric Verstraeten
4cc8135e1a move StoreSnapshot to separate method 2023-02-17 20:03:15 +01:00
Cedric Verstraeten
3cb38099ea add process memory + boot time 2023-02-15 13:01:16 +01:00
Cedric Verstraeten
deb0308dc4 rename attributes 2023-02-15 07:02:16 +01:00
Cedric Verstraeten
24c729eea3 Update Cloud.go 2023-02-14 23:11:55 +01:00
Cedric Verstraeten
c59d511ea3 fix for macs and ips 2023-02-14 22:55:18 +01:00
Cedric Verstraeten
6f8745dc3a alignment of motion recordings, make sure there is no overlap between two sibling recordings 2023-02-14 16:59:00 +01:00
Cedric Verstraeten
65d3d649b9 disable liveview (hd/sd) through env 2023-02-14 11:26:50 +01:00
Cedric Verstraeten
b4a8028c04 better way of doing prerecording + matching timestamp with prerecord time 2023-02-14 08:57:55 +01:00
Cedric Verstraeten
9d7077813a use timescale 10000000 2023-02-11 21:32:36 +01:00
Cedric Verstraeten
2feda33808 joy4 v1.0.51 2023-02-11 20:52:36 +01:00
16 changed files with 349 additions and 271 deletions

View File

@@ -1,3 +1,4 @@
{
"type": "",
"key": "",

View File

@@ -2,7 +2,8 @@ module github.com/kerberos-io/agent/machinery
go 1.19
//replace github.com/kerberos-io/joy4 v1.0.51 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/joy4 v1.0.54 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif
require (
@@ -20,7 +21,7 @@ require (
github.com/golang-module/carbon/v2 v2.2.3
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.52
github.com/kerberos-io/joy4 v1.0.55
github.com/kerberos-io/onvif v0.0.5
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e

View File

@@ -175,8 +175,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.52 h1:x08j6oz3umQMDMiKSLVspAojNhoyyloXYHhnEcD+0Qc=
github.com/kerberos-io/joy4 v1.0.52/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/joy4 v1.0.55 h1:P5RISBp8kUowgb/bvqLPVKPJL9n9jI/wXBCLs+XFMWg=
github.com/kerberos-io/joy4 v1.0.55/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=

View File

@@ -25,7 +25,19 @@ func OpenRTSP(url string) (av.DemuxCloser, []av.CodecData, error) {
return nil, []av.CodecData{}, err
}
func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
func GetVideoStream(streams []av.CodecData) (av.CodecData, error) {
var videoStream av.CodecData
for _, stream := range streams {
if stream.Type().IsAudio() {
//astream := stream.(av.AudioCodecData)
} else if stream.Type().IsVideo() {
videoStream = stream
}
}
return videoStream, nil
}
func GetVideoDecoder(decoder *ffmpeg.VideoDecoder, streams []av.CodecData) {
// Load video codec
var vstream av.VideoCodecData
for _, stream := range streams {
@@ -35,8 +47,10 @@ func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
vstream = stream.(av.VideoCodecData)
}
}
dec, _ := ffmpeg.NewVideoDecoder(vstream)
return dec
err := ffmpeg.NewVideoDecoder(decoder, vstream)
if err != nil {
log.Log.Error("GetVideoDecoder: " + err.Error())
}
}
func DecodeImage(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
@@ -46,6 +60,60 @@ func DecodeImage(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoD
return img, err
}
func GetStreamInsights(infile av.DemuxCloser, streams []av.CodecData) (int, int, int, int) {
var width, height, fps, gopsize int
for _, stream := range streams {
if stream.Type().IsAudio() {
//astream := stream.(av.AudioCodecData)
} else if stream.Type().IsVideo() {
vstream := stream.(av.VideoCodecData)
width = vstream.Width()
height = vstream.Height()
}
}
loop:
for timeout := time.After(1 * time.Second); ; {
var err error
if _, err = infile.ReadPacket(); err != nil { // sometimes this throws an end of file..
log.Log.Error("HandleStream: " + err.Error())
}
fps++
select {
case <-timeout:
break loop
default:
}
}
gopCounter := 0
start := false
for {
var pkt av.Packet
var err error
if pkt, err = infile.ReadPacket(); err != nil { // sometimes this throws an end of file..
log.Log.Error("HandleStream: " + err.Error())
}
// Could be that a decode is throwing errors.
if len(pkt.Data) > 0 {
if start {
gopCounter = gopCounter + 1
}
if pkt.IsKeyFrame {
if start == false {
start = true
} else {
gopsize = gopCounter
break
}
}
}
}
return width, height, fps, gopsize
}
func HandleStream(infile av.DemuxCloser, queue *pubsub.Queue, communication *models.Communication) { //, wg *sync.WaitGroup) {
log.Log.Debug("HandleStream: started")

View File

@@ -272,12 +272,30 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
var file *os.File
var err error
var lastDuration time.Duration
var lastRecordingTime int64
for motion := range communication.HandleMotion {
timestamp = time.Now().Unix()
startRecording = time.Now().Unix() // we mark the current time when the record started.
numberOfChanges := motion.NumberOfChanges
// If we have prerecording we will substract the number of seconds.
// Taking into account FPS = GOP size (Keyfram interval)
if config.Capture.PreRecording > 0 {
// Might be that recordings are coming short after each other.
// Therefore we do some math with the current time and the last recording time.
timeBetweenNowAndLastRecording := startRecording - lastRecordingTime
if timeBetweenNowAndLastRecording > int64(config.Capture.PreRecording) {
startRecording = startRecording - int64(config.Capture.PreRecording) + 1
} else {
startRecording = startRecording - timeBetweenNowAndLastRecording
}
}
// timestamp_microseconds_instanceName_regionCoordinates_numberOfChanges_token
// 1564859471_6-474162_oprit_577-283-727-375_1153_27.mp4
// - Timestamp
@@ -318,7 +336,7 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
var cursorError error
var pkt av.Packet
var nextPkt av.Packet
recordingCursor := queue.Oldest()
recordingCursor := queue.DelayedGopCount(int(config.Capture.PreRecording))
if cursorError == nil {
pkt, cursorError = recordingCursor.ReadPacket()
@@ -345,7 +363,7 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
log.Log.Info("HandleRecordStream: closing recording (timestamp: " + strconv.FormatInt(timestamp, 10) + ", recordingPeriod: " + strconv.FormatInt(recordingPeriod, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
break
}
if pkt.IsKeyFrame && !start {
if pkt.IsKeyFrame && !start && pkt.Time >= lastDuration {
log.Log.Info("HandleRecordStream: write frames")
start = true
}
@@ -372,6 +390,9 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
myMuxer.WriteTrailerWithPacket(nextPkt)
log.Log.Info("HandleRecordStream: file save: " + name)
lastDuration = pkt.Time
lastRecordingTime = time.Now().Unix()
// Cleanup muxer
myMuxer.Close()
myMuxer = nil

View File

@@ -95,6 +95,9 @@ func GetSystemInfo() (models.System, error) {
var usedMem uint64 = 0
var totalMem uint64 = 0
var freeMem uint64 = 0
var processUsedMem uint64 = 0
architecture := ""
cpuId := ""
KernelVersion := ""
@@ -133,18 +136,27 @@ func GetSystemInfo() (models.System, error) {
}
}
process, err := sysinfo.Self()
if err == nil {
memInfo, err := process.Memory()
if err == nil {
processUsedMem = memInfo.Resident
}
}
system := models.System{
Hostname: hostname,
CPUId: cpuId,
KernelVersion: KernelVersion,
Version: agentVersion,
MACs: MACs,
IPs: IPs,
BootTime: uint64(bootTime.Unix()),
Architecture: architecture,
UsedMemory: usedMem,
TotalMemory: totalMem,
FreeMemory: freeMem,
Hostname: hostname,
CPUId: cpuId,
KernelVersion: KernelVersion,
Version: agentVersion,
MACs: MACs,
IPs: IPs,
BootTime: uint64(bootTime.Unix()),
Architecture: architecture,
UsedMemory: usedMem,
TotalMemory: totalMem,
FreeMemory: freeMem,
ProcessUsedMemory: processUsedMem,
}
return system, nil
@@ -200,6 +212,11 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
uptimeString := carbon.Parse(uptimeFormatted).DiffForHumans()
uptimeString = strings.ReplaceAll(uptimeString, "ago", "")
// Do the same for boottime
bootTimeFormatted := time.Unix(int64(system.BootTime), 0).Format("2006-01-02 15:04:05")
boottimeString := carbon.Parse(bootTimeFormatted).DiffForHumans()
boottimeString = strings.ReplaceAll(boottimeString, "ago", "")
// We'll check which mode is enabled for the camera.
onvifEnabled := "false"
if config.Capture.IPCamera.ONVIFXAddr != "" {
@@ -221,6 +238,10 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
isEnterprise = true
}
// Congert to string
macs, _ := json.Marshal(system.MACs)
ips, _ := json.Marshal(system.IPs)
var object = fmt.Sprintf(`{
"key" : "%s",
"version" : "3.0.0",
@@ -235,13 +256,15 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
"totalMemory" : "%d",
"usedMemory" : "%d",
"freeMemory" : "%d",
"macs" : "%v",
"ips" : "%v",
"processMemory" : "%d",
"mac_list" : %s,
"ip_list" : %s,
"board" : "",
"disk1size" : "%s",
"disk3size" : "%s",
"diskvdasize" : "%s",
"uptime" : "%s",
"boot_time" : "%s",
"siteID" : "%s",
"onvif" : "%s",
"numberoffiles" : "33",
@@ -250,7 +273,7 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
"docker" : true,
"kios" : false,
"raspberrypi" : false
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.MACs, system.IPs, "0", "0", "0", uptimeString, config.HubSite, onvifEnabled)
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled)
var jsonStr = []byte(object)
buffy := bytes.NewBuffer(jsonStr)
@@ -308,51 +331,59 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod
config := configuration.Config
// If offline made is enabled, we will stop the thread.
if config.Offline == "true" {
log.Log.Debug("HandleLiveStreamSD: stopping as Offline is enabled.")
} else {
// Allocate frame
frame := ffmpeg.AllocVideoFrame()
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
key := ""
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
key = config.S3.Publickey
} else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" {
key = config.KStorage.CloudKey
}
// This is the new way ;)
if config.HubKey != "" {
key = config.HubKey
}
// Allocate frame
frame := ffmpeg.AllocVideoFrame()
topic := "kerberos/" + key + "/device/" + config.Key + "/live"
lastLivestreamRequest := int64(0)
var cursorError error
var pkt av.Packet
for cursorError == nil {
pkt, cursorError = livestreamCursor.ReadPacket()
if len(pkt.Data) == 0 || !pkt.IsKeyFrame {
continue
key := ""
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
key = config.S3.Publickey
} else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" {
key = config.KStorage.CloudKey
}
now := time.Now().Unix()
select {
case <-communication.HandleLiveSD:
lastLivestreamRequest = now
default:
// This is the new way ;)
if config.HubKey != "" {
key = config.HubKey
}
if now-lastLivestreamRequest > 3 {
continue
}
log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.")
sendImage(frame, topic, mqttClient, pkt, decoder, decoderMutex)
}
// Cleanup the frame.
frame.Free()
topic := "kerberos/" + key + "/device/" + config.Key + "/live"
lastLivestreamRequest := int64(0)
var cursorError error
var pkt av.Packet
for cursorError == nil {
pkt, cursorError = livestreamCursor.ReadPacket()
if len(pkt.Data) == 0 || !pkt.IsKeyFrame {
continue
}
now := time.Now().Unix()
select {
case <-communication.HandleLiveSD:
lastLivestreamRequest = now
default:
}
if now-lastLivestreamRequest > 3 {
continue
}
log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.")
sendImage(frame, topic, mqttClient, pkt, decoder, decoderMutex)
}
// Cleanup the frame.
frame.Free()
} else {
log.Log.Debug("HandleLiveStreamSD: stopping as Liveview is disabled.")
}
}
log.Log.Debug("HandleLiveStreamSD: finished")
@@ -375,34 +406,41 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod
log.Log.Debug("HandleLiveStreamHD: stopping as Offline is enabled.")
} else {
// Should create a track here.
track := webrtc.NewVideoTrack()
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, track, codecs, decoder, decoderMutex)
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
// Should create a track here.
track := webrtc.NewVideoTrack()
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, track, codecs, decoder, decoderMutex)
if config.Capture.ForwardWebRTC == "true" {
// We get a request with an offer, but we'll forward it.
for m := range communication.HandleLiveHDHandshake {
// Forward SDP
m.CloudKey = config.Key
request, err := json.Marshal(m)
if err == nil {
mqttClient.Publish("kerberos/webrtc/request", 2, false, request)
}
}
} else {
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
key := config.Key + "/" + handshake.Cuuid
webrtc.CandidatesMutex.Lock()
_, ok := webrtc.CandidateArrays[key]
if !ok {
webrtc.CandidateArrays[key] = make(chan string, 30)
}
webrtc.CandidatesMutex.Unlock()
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, track, handshake, webrtc.CandidateArrays[key])
if config.Capture.ForwardWebRTC == "true" {
// We get a request with an offer, but we'll forward it.
for m := range communication.HandleLiveHDHandshake {
// Forward SDP
m.CloudKey = config.Key
request, err := json.Marshal(m)
if err == nil {
mqttClient.Publish("kerberos/webrtc/request", 2, false, request)
}
}
} else {
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
key := config.Key + "/" + handshake.Cuuid
webrtc.CandidatesMutex.Lock()
_, ok := webrtc.CandidateArrays[key]
if !ok {
webrtc.CandidateArrays[key] = make(chan string, 30)
}
webrtc.CandidatesMutex.Unlock()
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, track, handshake, webrtc.CandidateArrays[key])
}
log.Log.Debug("HandleLiveStreamHD: stopping as Liveview is disabled.")
}
}
}

View File

@@ -1,8 +1,6 @@
package components
import (
"bufio"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
@@ -42,24 +40,6 @@ func GetImageFromFilePath() (image.Image, error) {
return nil, errors.New("Could not find a snapshot in " + snapshotDirectory)
}
func GetSnapshot() string {
var snapshot string
files, err := ioutil.ReadDir("./data/snapshots")
if err == nil && len(files) > 1 {
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
f, _ := os.Open("./data/snapshots/" + files[1].Name())
defer f.Close()
// Read entire JPG into byte slice.
reader := bufio.NewReader(f)
content, _ := ioutil.ReadAll(reader)
// Encode as base64.
snapshot = base64.StdEncoding.EncodeToString(content)
}
return snapshot
}
// ReadUserConfig Reads the user configuration of the Kerberos Open Source instance.
// This will return a models.User struct including the username, password,
// selected language, and if the installation was completed or not.
@@ -146,6 +126,9 @@ func OpenConfig(configuration *models.Configuration) {
conjungo.Merge(&s3, configuration.CustomConfig.S3, opts)
configuration.Config.S3 = &s3
// Cleanup
opts = nil
} else if os.Getenv("DEPLOYMENT") == "" || os.Getenv("DEPLOYMENT") == "agent" {
// Local deployment means we do a stand-alone installation
@@ -244,6 +227,9 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
case "AGENT_CAPTURE_CONTINUOUS":
configuration.Config.Capture.Continuous = value
break
case "AGENT_CAPTURE_LIVEVIEW":
configuration.Config.Capture.Liveview = value
break
case "AGENT_CAPTURE_MOTION":
configuration.Config.Capture.Motion = value
break

View File

@@ -1,11 +1,14 @@
package components
import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/kerberos-io/joy4/cgo/ffmpeg"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/cloud"
"github.com/kerberos-io/agent/machinery/src/computervision"
@@ -15,7 +18,6 @@ import (
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
"github.com/kerberos-io/joy4/av"
"github.com/kerberos-io/joy4/av/pubsub"
"github.com/kerberos-io/joy4/cgo/ffmpeg"
"github.com/tevino/abool"
)
@@ -57,12 +59,17 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
// do several checks to see if the agent is still operational.
go ControlAgent(communication)
// Create some global variables
decoder := &ffmpeg.VideoDecoder{}
subDecoder := &ffmpeg.VideoDecoder{}
cameraSettings := &models.Camera{}
// Run the agent and fire up all the other
// goroutines which do image capture, motion detection, onvif, etc.
for {
// This will blocking until receiving a signal to be restarted, reconfigured, stopped, etc.
status := RunAgent(configuration, communication, uptimeStart)
status := RunAgent(configuration, communication, uptimeStart, cameraSettings, decoder, subDecoder)
if status == "stop" {
break
}
@@ -72,7 +79,7 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
log.Log.Debug("Bootstrap: finished")
}
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time) string {
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time, cameraSettings *models.Camera, decoder *ffmpeg.VideoDecoder, subDecoder *ffmpeg.VideoDecoder) string {
log.Log.Debug("RunAgent: started")
config := configuration.Config
@@ -84,6 +91,10 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
infile, streams, err := capture.OpenRTSP(rtspUrl)
var queue *pubsub.Queue
var subQueue *pubsub.Queue
var decoderMutex sync.Mutex
var subDecoderMutex sync.Mutex
status := "not started"
@@ -104,15 +115,42 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
}
}
// At some routines we will need to decode the image.
// Make sure its properly locked as we only have a single decoder.
var decoderMutex sync.Mutex
var subDecoderMutex sync.Mutex
decoder := capture.GetVideoDecoder(streams)
// We will initialise the camera settings object
// so we can check if the camera settings have changed, and we need
// to reload the decoders.
videoStream, _ := capture.GetVideoStream(streams)
num, denum := videoStream.(av.VideoCodecData).Framerate()
width := videoStream.(av.VideoCodecData).Width()
height := videoStream.(av.VideoCodecData).Height()
var subDecoder *ffmpeg.VideoDecoder
if subStreamEnabled {
subDecoder = capture.GetVideoDecoder(subStreams)
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
if cameraSettings.Initialized {
decoder.Close()
if subStreamEnabled {
subDecoder.Close()
}
}
// At some routines we will need to decode the image.
// Make sure its properly locked as we only have a single decoder.
log.Log.Info("RunAgent: camera settings changed, reloading decoder")
capture.GetVideoDecoder(decoder, streams)
if subStreamEnabled {
capture.GetVideoDecoder(subDecoder, subStreams)
}
cameraSettings.RTSP = rtspUrl
cameraSettings.SubRTSP = subRtspUrl
cameraSettings.Width = width
cameraSettings.Height = height
cameraSettings.Framerate = float64(num) / float64(denum)
cameraSettings.Num = num
cameraSettings.Denum = denum
cameraSettings.Codec = videoStream.(av.VideoCodecData).Type()
cameraSettings.Initialized = true
} else {
log.Log.Info("RunAgent: camera settings did not change, keeping decoder")
}
communication.Decoder = decoder
@@ -131,15 +169,15 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
// processed by the different consumers: motion detection, recording, etc.
queue = pubsub.NewQueue()
communication.Queue = queue
queue.SetMaxGopCount(int(config.Capture.PreRecording)) // GOP time frame is set to prerecording.
log.Log.Info("RunAgent: SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)))
queue.SetMaxGopCount(int(config.Capture.PreRecording) + 1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
log.Log.Info("RunAgent: SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)+1))
queue.WriteHeader(streams)
// We might have a substream, if so we'll create a seperate queue.
var subQueue *pubsub.Queue
if subStreamEnabled {
log.Log.Info("RunAgent: Creating sub stream queue with SetMaxGopCount set to " + strconv.Itoa(int(config.Capture.PreRecording)))
log.Log.Info("RunAgent: Creating sub stream queue with SetMaxGopCount set to " + strconv.Itoa(int(1)))
subQueue = pubsub.NewQueue()
communication.SubQueue = subQueue
subQueue.SetMaxGopCount(1)
subQueue.WriteHeader(subStreams)
}
@@ -202,6 +240,13 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
// the agent will cleanup and restart.
status = <-communication.HandleBootstrap
close(communication.HandleONVIF)
communication.HandleONVIF = nil
close(communication.HandleLiveHDHandshake)
communication.HandleLiveHDHandshake = nil
close(communication.HandleMotion)
communication.HandleMotion = nil
// Here we are cleaning up everything!
if configuration.Config.Offline != "true" {
communication.HandleHeartBeat <- "stop"
@@ -214,22 +259,24 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
time.Sleep(time.Second * 1)
infile.Close()
infile = nil
queue.Close()
queue = nil
communication.Queue = nil
if subStreamEnabled {
subInfile.Close()
subInfile = nil
subQueue.Close()
subQueue = nil
communication.SubQueue = nil
}
close(communication.HandleONVIF)
close(communication.HandleLiveHDHandshake)
close(communication.HandleMotion)
routers.DisconnectMQTT(mqttClient)
// Disconnect MQTT
routers.DisconnectMQTT(mqttClient, &configuration.Config)
// Wait a few seconds to stop the decoder.
time.Sleep(time.Second * 3)
decoder.Close()
if subStreamEnabled {
subDecoder.Close()
}
// Waiting for some seconds to make sure everything is properly closed.
log.Log.Info("RunAgent: waiting 3 seconds to make sure everything is properly closed.")
time.Sleep(time.Second * 3)
@@ -240,6 +287,9 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
log.Log.Debug("RunAgent: finished")
// Clean up, force garbage collection
runtime.GC()
return status
}

View File

@@ -3,12 +3,9 @@ package computervision
import (
"bufio"
"bytes"
"encoding/base64"
"image"
"image/jpeg"
"io/ioutil"
"os"
"sort"
"strconv"
"sync"
"time"
@@ -130,26 +127,7 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
// Store snapshots (jpg) for hull.
if config.Capture.Snapshots != "false" {
files, err := ioutil.ReadDir("./data/snapshots")
if err == nil {
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
if len(files) > 3 {
os.Remove("./data/snapshots/" + files[0].Name())
}
// Save image
t := strconv.FormatInt(time.Now().Unix(), 10)
f, err := os.Create("./data/snapshots/" + t + ".jpg")
if err == nil {
jpeg.Encode(f, &rgbImage.Image, &jpeg.Options{Quality: 15})
f.Close()
}
}
}
StoreSnapshot(communication, frame, pkt, decoder, decoderMutex)
}
// Check if within time interval
@@ -255,3 +233,16 @@ func AbsDiffBitwiseAndThreshold(img1 *image.Gray, img2 *image.Gray, img3 *image.
}
return changes
}
func StoreSnapshot(communication *models.Communication, frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
buffer := new(bytes.Buffer)
w := bufio.NewWriter(buffer)
err := jpeg.Encode(w, &rgbImage.Image, &jpeg.Options{Quality: 15})
if err == nil {
snapshot := base64.StdEncoding.EncodeToString(buffer.Bytes())
communication.Image = snapshot
}
}
}

View File

@@ -0,0 +1,15 @@
package models
import "github.com/kerberos-io/joy4/av"
type Camera struct {
Width int
Height int
Num int
Denum int
Framerate float64
RTSP string
SubRTSP string
Codec av.CodecType
Initialized bool
}

View File

@@ -28,8 +28,10 @@ type Communication struct {
HandleONVIF chan OnvifAction
IsConfiguring *abool.AtomicBool
Queue *pubsub.Queue
SubQueue *pubsub.Queue
DecoderMutex *sync.Mutex
SubDecoderMutex *sync.Mutex
Decoder *ffmpeg.VideoDecoder
SubDecoder *ffmpeg.VideoDecoder
Image string
}

View File

@@ -53,6 +53,7 @@ type Capture struct {
Continuous string `json:"continuous,omitempty"`
Snapshots string `json:"snapshots,omitempty"`
Motion string `json:"motion,omitempty"`
Liveview string `json:"liveview,omitempty"`
PostRecording int64 `json:"postrecording"`
PreRecording int64 `json:"prerecording"`
MaxLengthRecording int64 `json:"maxlengthrecording"`

View File

@@ -1,16 +1,17 @@
package models
type System struct {
CPUId string `json:"cpu_idle" bson:"cpu_idle"`
Hostname string `json:"hostname" bson:"hostname"`
Version string `json:"version" bson:"version"`
Release string `json:"release" bson:"release"`
BootTime uint64 `json:"boot_time" bson:"boot_time"`
KernelVersion string `json:"kernel_version" bson:"kernel_version"`
MACs []string `json:"macs" bson:"macs"`
IPs []string `json:"ips" bson:"ips"`
Architecture string `json:"architecture" bson:"architecture"`
UsedMemory uint64 `json:"used_memory" bson:"used_memory"`
TotalMemory uint64 `json:"total_memory" bson:"total_memory"`
FreeMemory uint64 `json:"free_memory" bson:"free_memory"`
CPUId string `json:"cpu_idle" bson:"cpu_idle"`
Hostname string `json:"hostname" bson:"hostname"`
Version string `json:"version" bson:"version"`
Release string `json:"release" bson:"release"`
BootTime uint64 `json:"boot_time" bson:"boot_time"`
KernelVersion string `json:"kernel_version" bson:"kernel_version"`
MACs []string `json:"macs" bson:"macs"`
IPs []string `json:"ips" bson:"ips"`
Architecture string `json:"architecture" bson:"architecture"`
UsedMemory uint64 `json:"used_memory" bson:"used_memory"`
TotalMemory uint64 `json:"total_memory" bson:"total_memory"`
FreeMemory uint64 `json:"free_memory" bson:"free_memory"`
ProcessUsedMemory uint64 `json:"process_used_memory" bson:"process_used_memory"`
}

View File

@@ -29,7 +29,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configuratio
"config": configuration.Config,
"custom": configuration.CustomConfig,
"global": configuration.GlobalConfig,
"snapshot": components.GetSnapshot(),
"snapshot": communication.Image,
})
})
@@ -157,7 +157,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configuratio
"config": configuration.Config,
"custom": configuration.CustomConfig,
"global": configuration.GlobalConfig,
"snapshot": components.GetSnapshot(),
"snapshot": communication.Image,
})
})

View File

@@ -193,8 +193,16 @@ func MQTTListenerHandleONVIF(mqttClient mqtt.Client, hubKey string, configuratio
})
}
func DisconnectMQTT(mqttClient mqtt.Client) {
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
if mqttClient != nil {
// Cleanup all subscriptions.
mqttClient.Unsubscribe("kerberos/" + config.HubKey + "/device/" + config.Key + "/request-live")
mqttClient.Unsubscribe(config.Key + "/register")
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + config.Key)
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + config.Key)
mqttClient.Unsubscribe("candidate/cloud")
mqttClient.Unsubscribe("kerberos/onvif/" + config.Key)
mqttClient.Disconnect(1000)
mqttClient = nil
}
}

View File

@@ -253,7 +253,7 @@ func CreateFragmentedMP4(fullName string, fragmentedDuration int64) {
path, _ := os.Getwd()
duration := fragmentedDuration * 1000
// This timescale is crucial, as it should be the same as the one defined in JOY4.
cmd := exec.Command("mp4fragment", "--timescale", "90000", "--fragment-duration", strconv.FormatInt(duration, 10), fullName, fullName+"f.mp4")
cmd := exec.Command("mp4fragment", "--timescale", "10000000", "--fragment-duration", strconv.FormatInt(duration, 10), fullName, fullName+"f.mp4")
cmd.Dir = path
log.Log.Info(cmd.String())
var out bytes.Buffer
@@ -271,108 +271,3 @@ func CreateFragmentedMP4(fullName string, fragmentedDuration int64) {
os.Remove(fullName)
os.Rename(fullName+"f.mp4", fullName)
}
/*func FloatToString(input_num float64) string {
// to convert a float number to a string
return strconv.FormatFloat(input_num, 'f', 6, 64)
}
func ParseFMP4(file *bytes.Reader) (error, uint64, uint64, []*mp4.MediaSegment, uint64) {
var ftypSize, moovSize uint64
var segments []*mp4.MediaSegment
var duration uint64
parsedMp4, err := mp4.DecodeFile(file, mp4.WithDecodeMode(mp4.DecModeLazyMdat))
if parsedMp4 != nil && parsedMp4.Init != nil && err == nil {
ftypSize = parsedMp4.Init.Ftyp.Size()
moovSize = parsedMp4.Init.Moov.Size()
duration = parsedMp4.Moov.Trak.Tkhd.Duration
segments = parsedMp4.Segments
}
return err, ftypSize, moovSize, segments, duration
}
func ParseFMP4Detail(fullName string) {
// open fullName
file, err := os.Open(fullName)
if err != nil {
log.Log.Error("Error opening file: " + err.Error())
}
defer file.Close()
fileBytes, err := ioutil.ReadAll(file)
if err != nil {
log.Log.Error("Error reading file: " + err.Error())
}
fileReader := bytes.NewReader(fileBytes)
err, ftypSize, moovSize, segments, dur := ParseFMP4(fileReader)
fmt.Println("========== Fragmented details =================")
fmt.Println(dur)
fmt.Println(ftypSize)
fmt.Println(moovSize)
fmt.Println(len(segments))
// Calculate duration of segments
var totalDuration uint64
for _, segment := range segments {
fragments := segment.Fragments
for _, fragment := range fragments {
var sampleDuration uint32
samples := fragment.Moof.Traf.Trun.Samples
for _, sample := range samples {
sampleDuration += sample.Dur
}
fmt.Println(sampleDuration)
totalDuration += uint64(sampleDuration)
}
}
misingDuration := dur - totalDuration/100
fmt.Println(misingDuration)
fmt.Println(totalDuration/100 + misingDuration)
}
func CreateFragmentByteRanges(log log.Logging, fileName string, ftypSize uint64, moovSize uint64, segments []*mp4.MediaSegment) (string, []models.BytesRangeOnTime) {
size := strconv.FormatInt(int64(ftypSize+moovSize), 10)
var fileFragments strings.Builder
fileFragments.WriteString("#EXT-X-MAP:URI=\"" + fileName + "\",BYTERANGE=\"" + size + "@0\"\n")
var bytesRangeOnTime []models.BytesRangeOnTime
var currentTime uint32
for _, segment := range segments {
fragments := segment.Fragments
for _, fragment := range fragments {
var sampleDuration uint32
samples := fragment.Moof.Traf.Trun.Samples
for _, sample := range samples {
sampleDuration += sample.Dur
}
currentTime = currentTime + sampleDuration
duration := FloatToString(float64(sampleDuration / 100000))
startPos := fragment.Moof.StartPos
start := strconv.FormatInt(int64(startPos), 10)
totalSize := fragment.Mdat.Size() + fragment.Moof.Size()
size := strconv.FormatInt(int64(totalSize), 10)
fileFragments.WriteString("#EXTINF:" + duration + ",\n") // @TODO calculate the duration
fileFragments.WriteString("#EXT-X-BYTERANGE:" + size + "@" + start + "\n")
fileFragments.WriteString(fileName + "\n")
byteRange := models.BytesRangeOnTime{
Duration: duration,
Time: FloatToString(float64(currentTime) / 100000),
Range: size + "@" + start,
}
bytesRangeOnTime = append(bytesRangeOnTime, byteRange)
}
}
bytesRanges := fileFragments.String()
log.Info("Fragments calculate from " + fileName + ": " + bytesRanges)
return bytesRanges, bytesRangeOnTime
}*/