Compare commits

...

10 Commits

Author SHA1 Message Date
Cedric Verstraeten
24c729eea3 Update Cloud.go 2023-02-14 23:11:55 +01:00
Cedric Verstraeten
c59d511ea3 fix for macs and ips 2023-02-14 22:55:18 +01:00
Cedric Verstraeten
6f8745dc3a alignment of motion recordings, make sure there is no overlap between two sibling recordings 2023-02-14 16:59:00 +01:00
Cedric Verstraeten
65d3d649b9 disable liveview (hd/sd) through env 2023-02-14 11:26:50 +01:00
Cedric Verstraeten
b4a8028c04 better way of doing prerecording + matching timestamp with prerecord time 2023-02-14 08:57:55 +01:00
Cedric Verstraeten
9d7077813a use timescale 10000000 2023-02-11 21:32:36 +01:00
Cedric Verstraeten
2feda33808 joy4 v1.0.51 2023-02-11 20:52:36 +01:00
Cedric Verstraeten
ec42b9ea85 update timescale to 90000 2023-02-11 12:27:38 +01:00
Cedric Verstraeten
a2b4ee12ec align timescale to 90000 2023-02-11 12:02:04 +01:00
Cedric Verstraeten
a0f99a5167 add two new variables to disable snapshotting (encoding time) + disable motion 2023-02-07 13:55:38 +01:00
10 changed files with 215 additions and 110 deletions

View File

@@ -2,7 +2,7 @@ module github.com/kerberos-io/agent/machinery
go 1.19
//replace github.com/kerberos-io/joy4 v1.0.50 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/joy4 v1.0.51 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif
require (
@@ -20,7 +20,7 @@ require (
github.com/golang-module/carbon/v2 v2.2.3
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.51
github.com/kerberos-io/joy4 v1.0.53
github.com/kerberos-io/onvif v0.0.5
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e

View File

@@ -175,8 +175,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.51 h1:RxpXVkZIw1cfJEBPbfqdlwHfZtuDiLb/U25Na7jvPgo=
github.com/kerberos-io/joy4 v1.0.51/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/joy4 v1.0.53 h1:DfVptCUzo/77xLUIwnp1/dbcVffmT0DKPDduQBcu26Y=
github.com/kerberos-io/joy4 v1.0.53/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=

View File

@@ -46,6 +46,60 @@ func DecodeImage(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoD
return img, err
}
func GetStreamInsights(infile av.DemuxCloser, streams []av.CodecData) (int, int, int, int) {
var width, height, fps, gopsize int
for _, stream := range streams {
if stream.Type().IsAudio() {
//astream := stream.(av.AudioCodecData)
} else if stream.Type().IsVideo() {
vstream := stream.(av.VideoCodecData)
width = vstream.Width()
height = vstream.Height()
}
}
loop:
for timeout := time.After(1 * time.Second); ; {
var err error
if _, err = infile.ReadPacket(); err != nil { // sometimes this throws an end of file..
log.Log.Error("HandleStream: " + err.Error())
}
fps++
select {
case <-timeout:
break loop
default:
}
}
gopCounter := 0
start := false
for {
var pkt av.Packet
var err error
if pkt, err = infile.ReadPacket(); err != nil { // sometimes this throws an end of file..
log.Log.Error("HandleStream: " + err.Error())
}
// Could be that a decode is throwing errors.
if len(pkt.Data) > 0 {
if start {
gopCounter = gopCounter + 1
}
if pkt.IsKeyFrame {
if start == false {
start = true
} else {
gopsize = gopCounter
break
}
}
}
}
return width, height, fps, gopsize
}
func HandleStream(infile av.DemuxCloser, queue *pubsub.Queue, communication *models.Communication) { //, wg *sync.WaitGroup) {
log.Log.Debug("HandleStream: started")

View File

@@ -272,12 +272,30 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
var file *os.File
var err error
var lastDuration time.Duration
var lastRecordingTime int64
for motion := range communication.HandleMotion {
timestamp = time.Now().Unix()
startRecording = time.Now().Unix() // we mark the current time when the record started.
numberOfChanges := motion.NumberOfChanges
// If we have prerecording we will substract the number of seconds.
// Taking into account FPS = GOP size (Keyfram interval)
if config.Capture.PreRecording > 0 {
// Might be that recordings are coming short after each other.
// Therefore we do some math with the current time and the last recording time.
timeBetweenNowAndLastRecording := startRecording - lastRecordingTime
if timeBetweenNowAndLastRecording > int64(config.Capture.PreRecording) {
startRecording = startRecording - int64(config.Capture.PreRecording) + 1
} else {
startRecording = startRecording - timeBetweenNowAndLastRecording
}
}
// timestamp_microseconds_instanceName_regionCoordinates_numberOfChanges_token
// 1564859471_6-474162_oprit_577-283-727-375_1153_27.mp4
// - Timestamp
@@ -318,7 +336,7 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
var cursorError error
var pkt av.Packet
var nextPkt av.Packet
recordingCursor := queue.Oldest()
recordingCursor := queue.DelayedGopCount(int(config.Capture.PreRecording))
if cursorError == nil {
pkt, cursorError = recordingCursor.ReadPacket()
@@ -345,7 +363,7 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
log.Log.Info("HandleRecordStream: closing recording (timestamp: " + strconv.FormatInt(timestamp, 10) + ", recordingPeriod: " + strconv.FormatInt(recordingPeriod, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
break
}
if pkt.IsKeyFrame && !start {
if pkt.IsKeyFrame && !start && pkt.Time >= lastDuration {
log.Log.Info("HandleRecordStream: write frames")
start = true
}
@@ -372,6 +390,9 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
myMuxer.WriteTrailerWithPacket(nextPkt)
log.Log.Info("HandleRecordStream: file save: " + name)
lastDuration = pkt.Time
lastRecordingTime = time.Now().Unix()
// Cleanup muxer
myMuxer.Close()
myMuxer = nil

View File

@@ -221,6 +221,10 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
isEnterprise = true
}
// Congert to string
macs, _ := json.Marshal(system.MACs)
ips, _ := json.Marshal(system.IPs)
var object = fmt.Sprintf(`{
"key" : "%s",
"version" : "3.0.0",
@@ -235,8 +239,8 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
"totalMemory" : "%d",
"usedMemory" : "%d",
"freeMemory" : "%d",
"macs" : "%v",
"ips" : "%v",
"macs" : %s,
"ips" : %s,
"board" : "",
"disk1size" : "%s",
"disk3size" : "%s",
@@ -250,7 +254,7 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
"docker" : true,
"kios" : false,
"raspberrypi" : false
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.MACs, system.IPs, "0", "0", "0", uptimeString, config.HubSite, onvifEnabled)
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, macs, ips, "0", "0", "0", uptimeString, config.HubSite, onvifEnabled)
var jsonStr = []byte(object)
buffy := bytes.NewBuffer(jsonStr)
@@ -308,51 +312,59 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod
config := configuration.Config
// If offline made is enabled, we will stop the thread.
if config.Offline == "true" {
log.Log.Debug("HandleLiveStreamSD: stopping as Offline is enabled.")
} else {
// Allocate frame
frame := ffmpeg.AllocVideoFrame()
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
key := ""
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
key = config.S3.Publickey
} else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" {
key = config.KStorage.CloudKey
}
// This is the new way ;)
if config.HubKey != "" {
key = config.HubKey
}
// Allocate frame
frame := ffmpeg.AllocVideoFrame()
topic := "kerberos/" + key + "/device/" + config.Key + "/live"
lastLivestreamRequest := int64(0)
var cursorError error
var pkt av.Packet
for cursorError == nil {
pkt, cursorError = livestreamCursor.ReadPacket()
if len(pkt.Data) == 0 || !pkt.IsKeyFrame {
continue
key := ""
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
key = config.S3.Publickey
} else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" {
key = config.KStorage.CloudKey
}
now := time.Now().Unix()
select {
case <-communication.HandleLiveSD:
lastLivestreamRequest = now
default:
// This is the new way ;)
if config.HubKey != "" {
key = config.HubKey
}
if now-lastLivestreamRequest > 3 {
continue
}
log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.")
sendImage(frame, topic, mqttClient, pkt, decoder, decoderMutex)
}
// Cleanup the frame.
frame.Free()
topic := "kerberos/" + key + "/device/" + config.Key + "/live"
lastLivestreamRequest := int64(0)
var cursorError error
var pkt av.Packet
for cursorError == nil {
pkt, cursorError = livestreamCursor.ReadPacket()
if len(pkt.Data) == 0 || !pkt.IsKeyFrame {
continue
}
now := time.Now().Unix()
select {
case <-communication.HandleLiveSD:
lastLivestreamRequest = now
default:
}
if now-lastLivestreamRequest > 3 {
continue
}
log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.")
sendImage(frame, topic, mqttClient, pkt, decoder, decoderMutex)
}
// Cleanup the frame.
frame.Free()
} else {
log.Log.Debug("HandleLiveStreamSD: stopping as Liveview is disabled.")
}
}
log.Log.Debug("HandleLiveStreamSD: finished")
@@ -375,34 +387,41 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod
log.Log.Debug("HandleLiveStreamHD: stopping as Offline is enabled.")
} else {
// Should create a track here.
track := webrtc.NewVideoTrack()
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, track, codecs, decoder, decoderMutex)
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
// Should create a track here.
track := webrtc.NewVideoTrack()
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, track, codecs, decoder, decoderMutex)
if config.Capture.ForwardWebRTC == "true" {
// We get a request with an offer, but we'll forward it.
for m := range communication.HandleLiveHDHandshake {
// Forward SDP
m.CloudKey = config.Key
request, err := json.Marshal(m)
if err == nil {
mqttClient.Publish("kerberos/webrtc/request", 2, false, request)
}
}
} else {
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
key := config.Key + "/" + handshake.Cuuid
webrtc.CandidatesMutex.Lock()
_, ok := webrtc.CandidateArrays[key]
if !ok {
webrtc.CandidateArrays[key] = make(chan string, 30)
}
webrtc.CandidatesMutex.Unlock()
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, track, handshake, webrtc.CandidateArrays[key])
if config.Capture.ForwardWebRTC == "true" {
// We get a request with an offer, but we'll forward it.
for m := range communication.HandleLiveHDHandshake {
// Forward SDP
m.CloudKey = config.Key
request, err := json.Marshal(m)
if err == nil {
mqttClient.Publish("kerberos/webrtc/request", 2, false, request)
}
}
} else {
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
key := config.Key + "/" + handshake.Cuuid
webrtc.CandidatesMutex.Lock()
_, ok := webrtc.CandidateArrays[key]
if !ok {
webrtc.CandidateArrays[key] = make(chan string, 30)
}
webrtc.CandidatesMutex.Unlock()
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, track, handshake, webrtc.CandidateArrays[key])
}
log.Log.Debug("HandleLiveStreamHD: stopping as Liveview is disabled.")
}
}
}

View File

@@ -244,6 +244,15 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
case "AGENT_CAPTURE_CONTINUOUS":
configuration.Config.Capture.Continuous = value
break
case "AGENT_CAPTURE_LIVEVIEW":
configuration.Config.Capture.Liveview = value
break
case "AGENT_CAPTURE_MOTION":
configuration.Config.Capture.Motion = value
break
case "AGENT_CAPTURE_SNAPSHOTS":
configuration.Config.Capture.Snapshots = value
break
case "AGENT_CAPTURE_PRERECORDING":
duration, err := strconv.ParseInt(value, 10, 64)
if err == nil {

View File

@@ -131,14 +131,14 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
// processed by the different consumers: motion detection, recording, etc.
queue = pubsub.NewQueue()
communication.Queue = queue
queue.SetMaxGopCount(int(config.Capture.PreRecording)) // GOP time frame is set to prerecording.
log.Log.Info("RunAgent: SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)))
queue.SetMaxGopCount(int(config.Capture.PreRecording) + 1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
log.Log.Info("RunAgent: SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)+1))
queue.WriteHeader(streams)
// We might have a substream, if so we'll create a seperate queue.
var subQueue *pubsub.Queue
if subStreamEnabled {
log.Log.Info("RunAgent: Creating sub stream queue with SetMaxGopCount set to " + strconv.Itoa(int(config.Capture.PreRecording)))
log.Log.Info("RunAgent: Creating sub stream queue with SetMaxGopCount set to " + strconv.Itoa(int(1)))
subQueue = pubsub.NewQueue()
subQueue.SetMaxGopCount(1)
subQueue.WriteHeader(subStreams)

View File

@@ -39,12 +39,7 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
pixelThreshold = 150
}
if config.Capture.Recording == "false" {
// We might later add the option to still detect motion, but not record.
log.Log.Info("ProcessMotion: Recording disabled, so we do not need motion detection either.")
} else if config.Capture.Continuous == "true" {
if config.Capture.Continuous == "true" {
log.Log.Info("ProcessMotion: Continuous recording, so no motion detection.")
@@ -134,23 +129,25 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
}
// Store snapshots (jpg) for hull.
files, err := ioutil.ReadDir("./data/snapshots")
if err == nil {
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
if config.Capture.Snapshots != "false" {
files, err := ioutil.ReadDir("./data/snapshots")
if err == nil {
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
if len(files) > 3 {
os.Remove("./data/snapshots/" + files[0].Name())
}
// Save image
t := strconv.FormatInt(time.Now().Unix(), 10)
f, err := os.Create("./data/snapshots/" + t + ".jpg")
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
jpeg.Encode(f, &rgbImage.Image, &jpeg.Options{Quality: 15})
f.Close()
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
if len(files) > 3 {
os.Remove("./data/snapshots/" + files[0].Name())
}
// Save image
t := strconv.FormatInt(time.Now().Unix(), 10)
f, err := os.Create("./data/snapshots/" + t + ".jpg")
if err == nil {
jpeg.Encode(f, &rgbImage.Image, &jpeg.Options{Quality: 15})
f.Close()
}
}
}
}
@@ -178,26 +175,29 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
}
}
// Remember additional information about the result of findmotion
isPixelChangeThresholdReached, changesToReturn = FindMotion(imageArray, coordinatesToCheck, pixelThreshold)
if config.Capture.Motion != "false" {
if detectMotion && isPixelChangeThresholdReached {
// Remember additional information about the result of findmotion
isPixelChangeThresholdReached, changesToReturn = FindMotion(imageArray, coordinatesToCheck, pixelThreshold)
if detectMotion && isPixelChangeThresholdReached {
if mqttClient != nil {
mqttClient.Publish("kerberos/"+key+"/device/"+config.Key+"/motion", 2, false, "motion")
if mqttClient != nil {
mqttClient.Publish("kerberos/"+key+"/device/"+config.Key+"/motion", 2, false, "motion")
}
if config.Capture.Recording != "false" {
dataToPass := models.MotionDataPartial{
Timestamp: time.Now().Unix(),
NumberOfChanges: changesToReturn,
}
communication.HandleMotion <- dataToPass //Save data to the channel
}
}
//FIXME: In the future MotionDataPartial should be replaced with MotionDataFull
dataToPass := models.MotionDataPartial{
Timestamp: time.Now().Unix(),
NumberOfChanges: changesToReturn,
}
communication.HandleMotion <- dataToPass //Save data to the channel
imageArray[0] = imageArray[1]
imageArray[1] = imageArray[2]
i++
}
imageArray[0] = imageArray[1]
imageArray[1] = imageArray[2]
i++
}
if img != nil {
@@ -239,7 +239,6 @@ func GetRawImage(frame *ffmpeg.VideoFrame, pkt av.Packet, dec *ffmpeg.VideoDecod
func ImageToBytes(img image.Image) ([]byte, error) {
buffer := new(bytes.Buffer)
w := bufio.NewWriter(buffer)
//err := jpeg.Encode(w, img, &jpeg.EncoderOptions{Quality: 70})
err := jpeg.Encode(w, img, &jpeg.Options{Quality: 15})
return buffer.Bytes(), err
}

View File

@@ -51,6 +51,9 @@ type Capture struct {
RaspiCamera RaspiCamera `json:"raspicamera"`
Recording string `json:"recording,omitempty"`
Continuous string `json:"continuous,omitempty"`
Snapshots string `json:"snapshots,omitempty"`
Motion string `json:"motion,omitempty"`
Liveview string `json:"liveview,omitempty"`
PostRecording int64 `json:"postrecording"`
PreRecording int64 `json:"prerecording"`
MaxLengthRecording int64 `json:"maxlengthrecording"`

View File

@@ -253,7 +253,7 @@ func CreateFragmentedMP4(fullName string, fragmentedDuration int64) {
path, _ := os.Getwd()
duration := fragmentedDuration * 1000
// This timescale is crucial, as it should be the same as the one defined in JOY4.
cmd := exec.Command("mp4fragment", "--timescale", "10000", "--fragment-duration", strconv.FormatInt(duration, 10), fullName, fullName+"f.mp4")
cmd := exec.Command("mp4fragment", "--timescale", "10000000", "--fragment-duration", strconv.FormatInt(duration, 10), fullName, fullName+"f.mp4")
cmd.Dir = path
log.Log.Info(cmd.String())
var out bytes.Buffer