mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-03 08:50:08 +00:00
Compare commits
31 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9fa9538320 | ||
|
|
943e81000e | ||
|
|
36b93a34b4 | ||
|
|
b0d2409524 | ||
|
|
be7a231950 | ||
|
|
31a0b9efa4 | ||
|
|
d70a3ed343 | ||
|
|
56cebb6451 | ||
|
|
99f61bc5e8 | ||
|
|
a5d02e3275 | ||
|
|
354ab7db05 | ||
|
|
dc817f8c26 | ||
|
|
a90097731c | ||
|
|
5b3bbbb37e | ||
|
|
4a4aabd71c | ||
|
|
b058c1e742 | ||
|
|
7671b1c2c3 | ||
|
|
4cc8135e1a | ||
|
|
3cb38099ea | ||
|
|
deb0308dc4 | ||
|
|
24c729eea3 | ||
|
|
c59d511ea3 | ||
|
|
6f8745dc3a | ||
|
|
65d3d649b9 | ||
|
|
b4a8028c04 | ||
|
|
9d7077813a | ||
|
|
2feda33808 | ||
|
|
ec42b9ea85 | ||
|
|
a2b4ee12ec | ||
|
|
a0f99a5167 | ||
|
|
9aff467afc |
@@ -1,2 +1,2 @@
|
||||
FROM kerberos/devcontainer:9da0ee3
|
||||
FROM kerberos/devcontainer:b2bc659
|
||||
LABEL AUTHOR=Kerberos.io
|
||||
|
||||
36
README.md
36
README.md
@@ -210,6 +210,42 @@ After a few minutes, you will see a beautiful `Visual Studio Code` shown in your
|
||||
|
||||

|
||||
|
||||
On opening of the GitHub Codespace, some dependencies will be installed. Once this is done go ahead to the `ui/src/config.json` file, and (un)comment following section. Make sure to replace the `externalHost` variable with the DNS name you will retrieve from the next step.
|
||||
|
||||
// Uncomment this when using codespaces or other special DNS names (which you can't control)
|
||||
// replace this with the DNS name of the kerberos agent server (the codespace url)
|
||||
const externalHost = 'cedricve-automatic-computing-machine-v647rxvj4whx9qp-80.preview.app.github.dev';
|
||||
|
||||
const dev = {
|
||||
ENV: 'dev',
|
||||
HOSTNAME: externalHost,
|
||||
//API_URL: `${protocol}//${hostname}:8080/api`,
|
||||
//URL: `${protocol}//${hostname}:8080`,
|
||||
//WS_URL: `${websocketprotocol}//${hostname}:8080/ws`,
|
||||
|
||||
// Uncomment, and comment the above lines, when using codespaces or other special DNS names (which you can't control)
|
||||
API_URL: `${protocol}//${externalHost}/api`,
|
||||
URL: `${protocol}//${externalHost}`,
|
||||
WS_URL: `${websocketprotocol}//${externalHost}/ws`,
|
||||
};
|
||||
|
||||
Go and open two terminals one for the `ui` project and one for the `machinery` project.
|
||||
|
||||
1. Terminal A:
|
||||
|
||||
cd machinery/
|
||||
go run main.go run camera 80
|
||||
|
||||
2. Terminal B:
|
||||
|
||||
cd ui/
|
||||
yarn start
|
||||
|
||||
Once executed, a popup will show up mentioning `portforwarding`. You should see two ports being opened, one for the ui `3000` and one for the machinery `80`. `Right-click` on the port `80` and change visibility from `private` to `public`, this is required to avoid `CORS` errors.
|
||||
|
||||
As mentioned above, copy the hostname of the `machinery` DNS name, and past it in the `ui/src/config.json` file. Once done reload, the `ui` page in your browser, and you should be able to access the login page with the default credentials `root` and `root`.
|
||||
|
||||
|
||||
## Develop and build
|
||||
|
||||
Kerberos Agent is divided in two parts a `machinery` and `web`. Both parts live in this repository in their relative folders. For development or running the application on your local machine, you have to run both the `machinery` and the `web` as described below. When running in production everything is shipped as only one artifact, read more about this at [Building for production](#building-for-production).
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
{
|
||||
"type": "",
|
||||
"key": "",
|
||||
|
||||
@@ -2,7 +2,8 @@ module github.com/kerberos-io/agent/machinery
|
||||
|
||||
go 1.19
|
||||
|
||||
//replace github.com/kerberos-io/joy4 v1.0.48 => ../../../../github.com/kerberos-io/joy4
|
||||
//replace github.com/kerberos-io/joy4 v1.0.54 => ../../../../github.com/kerberos-io/joy4
|
||||
|
||||
//replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif
|
||||
|
||||
require (
|
||||
@@ -20,13 +21,12 @@ require (
|
||||
github.com/golang-module/carbon/v2 v2.2.3
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kellydunn/golang-geo v0.7.0
|
||||
github.com/kerberos-io/joy4 v1.0.50
|
||||
github.com/kerberos-io/joy4 v1.0.55
|
||||
github.com/kerberos-io/onvif v0.0.5
|
||||
github.com/minio/minio-go/v6 v6.0.57
|
||||
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/pion/webrtc/v3 v3.1.50
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
github.com/swaggo/files v1.0.0
|
||||
github.com/swaggo/gin-swagger v1.5.3
|
||||
@@ -58,7 +58,6 @@ require (
|
||||
github.com/elgs/gostrgen v0.0.0-20161222160715-9d61ae07eeae // indirect
|
||||
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 // indirect
|
||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.5 // indirect
|
||||
github.com/go-openapi/jsonreference v0.19.6 // indirect
|
||||
github.com/go-openapi/spec v0.20.4 // indirect
|
||||
@@ -110,7 +109,6 @@ require (
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/tinylib/msgp v1.1.6 // indirect
|
||||
github.com/ugorji/go/codec v1.2.7 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||
github.com/ziutek/mymysql v1.5.4 // indirect
|
||||
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
|
||||
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
|
||||
|
||||
@@ -93,8 +93,6 @@ github.com/gin-gonic/gin v1.6.2/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwv
|
||||
github.com/gin-gonic/gin v1.8.1/go.mod h1:ji8BvRH1azfM+SYow9zQ6SZMvR8qOMZHmsCuWR9tTTk=
|
||||
github.com/gin-gonic/gin v1.8.2 h1:UzKToD9/PoFj/V4rvlKqTRKnQYyz8Sc1MJlv4JHPtvY=
|
||||
github.com/gin-gonic/gin v1.8.2/go.mod h1:qw5AYuDrzRTnhvusDsrov+fDIxp9Dleuu12h8nfB398=
|
||||
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
|
||||
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
|
||||
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
|
||||
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
|
||||
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
|
||||
@@ -177,8 +175,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
|
||||
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
|
||||
github.com/kerberos-io/joy4 v1.0.50 h1:N1qr0Q6ytZPG5ZmG1hDVXWeRQ7jzM7f5QftDQ/KQVCo=
|
||||
github.com/kerberos-io/joy4 v1.0.50/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/joy4 v1.0.55 h1:P5RISBp8kUowgb/bvqLPVKPJL9n9jI/wXBCLs+XFMWg=
|
||||
github.com/kerberos-io/joy4 v1.0.55/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
|
||||
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=
|
||||
@@ -310,8 +308,6 @@ github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZ
|
||||
github.com/secure-systems-lab/go-securesystemslib v0.3.1/go.mod h1:o8hhjkbNl2gOamKUA/eNW3xUrntHT9L4W89W1nfj43U=
|
||||
github.com/secure-systems-lab/go-securesystemslib v0.4.0 h1:b23VGrQhTA8cN2CbBw7/FulN9fTtqYUdS5+Oxzt+DUE=
|
||||
github.com/secure-systems-lab/go-securesystemslib v0.4.0/go.mod h1:FGBZgq2tXWICsxWQW1msNf49F0Pf2Op5Htayx335Qbs=
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
|
||||
github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
|
||||
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
@@ -366,8 +362,6 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
|
||||
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs=
|
||||
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
|
||||
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 h1:UXLjNohABv4S58tHmeuIZDO6e3mHpW2Dx33gaNt03LE=
|
||||
@@ -438,7 +432,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
||||
@@ -25,7 +25,19 @@ func OpenRTSP(url string) (av.DemuxCloser, []av.CodecData, error) {
|
||||
return nil, []av.CodecData{}, err
|
||||
}
|
||||
|
||||
func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
|
||||
func GetVideoStream(streams []av.CodecData) (av.CodecData, error) {
|
||||
var videoStream av.CodecData
|
||||
for _, stream := range streams {
|
||||
if stream.Type().IsAudio() {
|
||||
//astream := stream.(av.AudioCodecData)
|
||||
} else if stream.Type().IsVideo() {
|
||||
videoStream = stream
|
||||
}
|
||||
}
|
||||
return videoStream, nil
|
||||
}
|
||||
|
||||
func GetVideoDecoder(decoder *ffmpeg.VideoDecoder, streams []av.CodecData) {
|
||||
// Load video codec
|
||||
var vstream av.VideoCodecData
|
||||
for _, stream := range streams {
|
||||
@@ -35,17 +47,73 @@ func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
|
||||
vstream = stream.(av.VideoCodecData)
|
||||
}
|
||||
}
|
||||
dec, _ := ffmpeg.NewVideoDecoder(vstream)
|
||||
return dec
|
||||
err := ffmpeg.NewVideoDecoder(decoder, vstream)
|
||||
if err != nil {
|
||||
log.Log.Error("GetVideoDecoder: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeImage(pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
|
||||
func DecodeImage(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
|
||||
decoderMutex.Lock()
|
||||
img, err := decoder.Decode(pkt.Data)
|
||||
img, err := decoder.Decode(frame, pkt.Data)
|
||||
decoderMutex.Unlock()
|
||||
return img, err
|
||||
}
|
||||
|
||||
func GetStreamInsights(infile av.DemuxCloser, streams []av.CodecData) (int, int, int, int) {
|
||||
var width, height, fps, gopsize int
|
||||
for _, stream := range streams {
|
||||
if stream.Type().IsAudio() {
|
||||
//astream := stream.(av.AudioCodecData)
|
||||
} else if stream.Type().IsVideo() {
|
||||
vstream := stream.(av.VideoCodecData)
|
||||
width = vstream.Width()
|
||||
height = vstream.Height()
|
||||
}
|
||||
}
|
||||
|
||||
loop:
|
||||
for timeout := time.After(1 * time.Second); ; {
|
||||
var err error
|
||||
if _, err = infile.ReadPacket(); err != nil { // sometimes this throws an end of file..
|
||||
log.Log.Error("HandleStream: " + err.Error())
|
||||
}
|
||||
fps++
|
||||
select {
|
||||
case <-timeout:
|
||||
break loop
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
gopCounter := 0
|
||||
start := false
|
||||
for {
|
||||
var pkt av.Packet
|
||||
var err error
|
||||
if pkt, err = infile.ReadPacket(); err != nil { // sometimes this throws an end of file..
|
||||
log.Log.Error("HandleStream: " + err.Error())
|
||||
}
|
||||
// Could be that a decode is throwing errors.
|
||||
if len(pkt.Data) > 0 {
|
||||
if start {
|
||||
gopCounter = gopCounter + 1
|
||||
}
|
||||
|
||||
if pkt.IsKeyFrame {
|
||||
if start == false {
|
||||
start = true
|
||||
} else {
|
||||
gopsize = gopCounter
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return width, height, fps, gopsize
|
||||
}
|
||||
|
||||
func HandleStream(infile av.DemuxCloser, queue *pubsub.Queue, communication *models.Communication) { //, wg *sync.WaitGroup) {
|
||||
|
||||
log.Log.Debug("HandleStream: started")
|
||||
|
||||
@@ -272,12 +272,30 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
|
||||
var file *os.File
|
||||
var err error
|
||||
|
||||
var lastDuration time.Duration
|
||||
var lastRecordingTime int64
|
||||
|
||||
for motion := range communication.HandleMotion {
|
||||
|
||||
timestamp = time.Now().Unix()
|
||||
startRecording = time.Now().Unix() // we mark the current time when the record started.
|
||||
numberOfChanges := motion.NumberOfChanges
|
||||
|
||||
// If we have prerecording we will substract the number of seconds.
|
||||
// Taking into account FPS = GOP size (Keyfram interval)
|
||||
if config.Capture.PreRecording > 0 {
|
||||
|
||||
// Might be that recordings are coming short after each other.
|
||||
// Therefore we do some math with the current time and the last recording time.
|
||||
|
||||
timeBetweenNowAndLastRecording := startRecording - lastRecordingTime
|
||||
if timeBetweenNowAndLastRecording > int64(config.Capture.PreRecording) {
|
||||
startRecording = startRecording - int64(config.Capture.PreRecording) + 1
|
||||
} else {
|
||||
startRecording = startRecording - timeBetweenNowAndLastRecording
|
||||
}
|
||||
}
|
||||
|
||||
// timestamp_microseconds_instanceName_regionCoordinates_numberOfChanges_token
|
||||
// 1564859471_6-474162_oprit_577-283-727-375_1153_27.mp4
|
||||
// - Timestamp
|
||||
@@ -318,7 +336,7 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
|
||||
var cursorError error
|
||||
var pkt av.Packet
|
||||
var nextPkt av.Packet
|
||||
recordingCursor := queue.Oldest()
|
||||
recordingCursor := queue.DelayedGopCount(int(config.Capture.PreRecording))
|
||||
|
||||
if cursorError == nil {
|
||||
pkt, cursorError = recordingCursor.ReadPacket()
|
||||
@@ -345,7 +363,7 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
|
||||
log.Log.Info("HandleRecordStream: closing recording (timestamp: " + strconv.FormatInt(timestamp, 10) + ", recordingPeriod: " + strconv.FormatInt(recordingPeriod, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
|
||||
break
|
||||
}
|
||||
if pkt.IsKeyFrame && !start {
|
||||
if pkt.IsKeyFrame && !start && pkt.Time >= lastDuration {
|
||||
log.Log.Info("HandleRecordStream: write frames")
|
||||
start = true
|
||||
}
|
||||
@@ -372,6 +390,9 @@ func HandleRecordStream(queue *pubsub.Queue, configuration *models.Configuration
|
||||
myMuxer.WriteTrailerWithPacket(nextPkt)
|
||||
log.Log.Info("HandleRecordStream: file save: " + name)
|
||||
|
||||
lastDuration = pkt.Time
|
||||
lastRecordingTime = time.Now().Unix()
|
||||
|
||||
// Cleanup muxer
|
||||
myMuxer.Close()
|
||||
myMuxer = nil
|
||||
|
||||
@@ -95,6 +95,9 @@ func GetSystemInfo() (models.System, error) {
|
||||
var usedMem uint64 = 0
|
||||
var totalMem uint64 = 0
|
||||
var freeMem uint64 = 0
|
||||
|
||||
var processUsedMem uint64 = 0
|
||||
|
||||
architecture := ""
|
||||
cpuId := ""
|
||||
KernelVersion := ""
|
||||
@@ -133,18 +136,27 @@ func GetSystemInfo() (models.System, error) {
|
||||
}
|
||||
}
|
||||
|
||||
process, err := sysinfo.Self()
|
||||
if err == nil {
|
||||
memInfo, err := process.Memory()
|
||||
if err == nil {
|
||||
processUsedMem = memInfo.Resident
|
||||
}
|
||||
}
|
||||
|
||||
system := models.System{
|
||||
Hostname: hostname,
|
||||
CPUId: cpuId,
|
||||
KernelVersion: KernelVersion,
|
||||
Version: agentVersion,
|
||||
MACs: MACs,
|
||||
IPs: IPs,
|
||||
BootTime: uint64(bootTime.Unix()),
|
||||
Architecture: architecture,
|
||||
UsedMemory: usedMem,
|
||||
TotalMemory: totalMem,
|
||||
FreeMemory: freeMem,
|
||||
Hostname: hostname,
|
||||
CPUId: cpuId,
|
||||
KernelVersion: KernelVersion,
|
||||
Version: agentVersion,
|
||||
MACs: MACs,
|
||||
IPs: IPs,
|
||||
BootTime: uint64(bootTime.Unix()),
|
||||
Architecture: architecture,
|
||||
UsedMemory: usedMem,
|
||||
TotalMemory: totalMem,
|
||||
FreeMemory: freeMem,
|
||||
ProcessUsedMemory: processUsedMem,
|
||||
}
|
||||
|
||||
return system, nil
|
||||
@@ -200,6 +212,11 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
|
||||
uptimeString := carbon.Parse(uptimeFormatted).DiffForHumans()
|
||||
uptimeString = strings.ReplaceAll(uptimeString, "ago", "")
|
||||
|
||||
// Do the same for boottime
|
||||
bootTimeFormatted := time.Unix(int64(system.BootTime), 0).Format("2006-01-02 15:04:05")
|
||||
boottimeString := carbon.Parse(bootTimeFormatted).DiffForHumans()
|
||||
boottimeString = strings.ReplaceAll(boottimeString, "ago", "")
|
||||
|
||||
// We'll check which mode is enabled for the camera.
|
||||
onvifEnabled := "false"
|
||||
if config.Capture.IPCamera.ONVIFXAddr != "" {
|
||||
@@ -221,6 +238,10 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
|
||||
isEnterprise = true
|
||||
}
|
||||
|
||||
// Congert to string
|
||||
macs, _ := json.Marshal(system.MACs)
|
||||
ips, _ := json.Marshal(system.IPs)
|
||||
|
||||
var object = fmt.Sprintf(`{
|
||||
"key" : "%s",
|
||||
"version" : "3.0.0",
|
||||
@@ -235,13 +256,15 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
|
||||
"totalMemory" : "%d",
|
||||
"usedMemory" : "%d",
|
||||
"freeMemory" : "%d",
|
||||
"macs" : "%v",
|
||||
"ips" : "%v",
|
||||
"processMemory" : "%d",
|
||||
"mac_list" : %s,
|
||||
"ip_list" : %s,
|
||||
"board" : "",
|
||||
"disk1size" : "%s",
|
||||
"disk3size" : "%s",
|
||||
"diskvdasize" : "%s",
|
||||
"uptime" : "%s",
|
||||
"boot_time" : "%s",
|
||||
"siteID" : "%s",
|
||||
"onvif" : "%s",
|
||||
"numberoffiles" : "33",
|
||||
@@ -250,7 +273,7 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
|
||||
"docker" : true,
|
||||
"kios" : false,
|
||||
"raspberrypi" : false
|
||||
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.MACs, system.IPs, "0", "0", "0", uptimeString, config.HubSite, onvifEnabled)
|
||||
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled)
|
||||
|
||||
var jsonStr = []byte(object)
|
||||
buffy := bytes.NewBuffer(jsonStr)
|
||||
@@ -308,59 +331,71 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod
|
||||
|
||||
config := configuration.Config
|
||||
|
||||
// If offline made is enabled, we will stop the thread.
|
||||
if config.Offline == "true" {
|
||||
log.Log.Debug("HandleLiveStreamSD: stopping as Offline is enabled.")
|
||||
} else {
|
||||
|
||||
key := ""
|
||||
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
|
||||
key = config.S3.Publickey
|
||||
} else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" {
|
||||
key = config.KStorage.CloudKey
|
||||
}
|
||||
// This is the new way ;)
|
||||
if config.HubKey != "" {
|
||||
key = config.HubKey
|
||||
}
|
||||
// Check if we need to enable the live stream
|
||||
if config.Capture.Liveview != "false" {
|
||||
|
||||
topic := "kerberos/" + key + "/device/" + config.Key + "/live"
|
||||
// Allocate frame
|
||||
frame := ffmpeg.AllocVideoFrame()
|
||||
|
||||
lastLivestreamRequest := int64(0)
|
||||
|
||||
var cursorError error
|
||||
var pkt av.Packet
|
||||
|
||||
for cursorError == nil {
|
||||
pkt, cursorError = livestreamCursor.ReadPacket()
|
||||
if len(pkt.Data) == 0 || !pkt.IsKeyFrame {
|
||||
continue
|
||||
key := ""
|
||||
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
|
||||
key = config.S3.Publickey
|
||||
} else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" {
|
||||
key = config.KStorage.CloudKey
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
select {
|
||||
case <-communication.HandleLiveSD:
|
||||
lastLivestreamRequest = now
|
||||
default:
|
||||
// This is the new way ;)
|
||||
if config.HubKey != "" {
|
||||
key = config.HubKey
|
||||
}
|
||||
if now-lastLivestreamRequest > 3 {
|
||||
continue
|
||||
|
||||
topic := "kerberos/" + key + "/device/" + config.Key + "/live"
|
||||
|
||||
lastLivestreamRequest := int64(0)
|
||||
|
||||
var cursorError error
|
||||
var pkt av.Packet
|
||||
|
||||
for cursorError == nil {
|
||||
pkt, cursorError = livestreamCursor.ReadPacket()
|
||||
if len(pkt.Data) == 0 || !pkt.IsKeyFrame {
|
||||
continue
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
select {
|
||||
case <-communication.HandleLiveSD:
|
||||
lastLivestreamRequest = now
|
||||
default:
|
||||
}
|
||||
if now-lastLivestreamRequest > 3 {
|
||||
continue
|
||||
}
|
||||
log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.")
|
||||
sendImage(frame, topic, mqttClient, pkt, decoder, decoderMutex)
|
||||
}
|
||||
log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.")
|
||||
sendImage(topic, mqttClient, pkt, decoder, decoderMutex)
|
||||
|
||||
// Cleanup the frame.
|
||||
frame.Free()
|
||||
|
||||
} else {
|
||||
log.Log.Debug("HandleLiveStreamSD: stopping as Liveview is disabled.")
|
||||
}
|
||||
}
|
||||
|
||||
log.Log.Debug("HandleLiveStreamSD: finished")
|
||||
}
|
||||
|
||||
func sendImage(topic string, mqttClient mqtt.Client, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
|
||||
img, err := computervision.GetRawImage(pkt, decoder, decoderMutex)
|
||||
func sendImage(frame *ffmpeg.VideoFrame, topic string, mqttClient mqtt.Client, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
|
||||
_, err := computervision.GetRawImage(frame, pkt, decoder, decoderMutex)
|
||||
if err == nil {
|
||||
bytes, _ := computervision.ImageToBytes(&img.Image)
|
||||
bytes, _ := computervision.ImageToBytes(&frame.Image)
|
||||
encoded := base64.StdEncoding.EncodeToString(bytes)
|
||||
mqttClient.Publish(topic, 0, false, encoded)
|
||||
}
|
||||
// Cleanup the image.
|
||||
img.Free()
|
||||
}
|
||||
|
||||
func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, codecs []av.CodecData, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
|
||||
@@ -371,34 +406,41 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod
|
||||
log.Log.Debug("HandleLiveStreamHD: stopping as Offline is enabled.")
|
||||
} else {
|
||||
|
||||
// Should create a track here.
|
||||
track := webrtc.NewVideoTrack()
|
||||
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, track, codecs, decoder, decoderMutex)
|
||||
// Check if we need to enable the live stream
|
||||
if config.Capture.Liveview != "false" {
|
||||
|
||||
// Should create a track here.
|
||||
track := webrtc.NewVideoTrack()
|
||||
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, track, codecs, decoder, decoderMutex)
|
||||
|
||||
if config.Capture.ForwardWebRTC == "true" {
|
||||
// We get a request with an offer, but we'll forward it.
|
||||
for m := range communication.HandleLiveHDHandshake {
|
||||
// Forward SDP
|
||||
m.CloudKey = config.Key
|
||||
request, err := json.Marshal(m)
|
||||
if err == nil {
|
||||
mqttClient.Publish("kerberos/webrtc/request", 2, false, request)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
|
||||
for handshake := range communication.HandleLiveHDHandshake {
|
||||
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
|
||||
key := config.Key + "/" + handshake.Cuuid
|
||||
webrtc.CandidatesMutex.Lock()
|
||||
_, ok := webrtc.CandidateArrays[key]
|
||||
if !ok {
|
||||
webrtc.CandidateArrays[key] = make(chan string, 30)
|
||||
}
|
||||
webrtc.CandidatesMutex.Unlock()
|
||||
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, track, handshake, webrtc.CandidateArrays[key])
|
||||
|
||||
if config.Capture.ForwardWebRTC == "true" {
|
||||
// We get a request with an offer, but we'll forward it.
|
||||
for m := range communication.HandleLiveHDHandshake {
|
||||
// Forward SDP
|
||||
m.CloudKey = config.Key
|
||||
request, err := json.Marshal(m)
|
||||
if err == nil {
|
||||
mqttClient.Publish("kerberos/webrtc/request", 2, false, request)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
|
||||
for handshake := range communication.HandleLiveHDHandshake {
|
||||
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
|
||||
key := config.Key + "/" + handshake.Cuuid
|
||||
webrtc.CandidatesMutex.Lock()
|
||||
_, ok := webrtc.CandidateArrays[key]
|
||||
if !ok {
|
||||
webrtc.CandidateArrays[key] = make(chan string, 30)
|
||||
}
|
||||
webrtc.CandidatesMutex.Unlock()
|
||||
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, track, handshake, webrtc.CandidateArrays[key])
|
||||
|
||||
}
|
||||
log.Log.Debug("HandleLiveStreamHD: stopping as Liveview is disabled.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package components
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -42,24 +40,6 @@ func GetImageFromFilePath() (image.Image, error) {
|
||||
return nil, errors.New("Could not find a snapshot in " + snapshotDirectory)
|
||||
}
|
||||
|
||||
func GetSnapshot() string {
|
||||
var snapshot string
|
||||
files, err := ioutil.ReadDir("./data/snapshots")
|
||||
if err == nil && len(files) > 1 {
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
return files[i].ModTime().Before(files[j].ModTime())
|
||||
})
|
||||
f, _ := os.Open("./data/snapshots/" + files[1].Name())
|
||||
defer f.Close()
|
||||
// Read entire JPG into byte slice.
|
||||
reader := bufio.NewReader(f)
|
||||
content, _ := ioutil.ReadAll(reader)
|
||||
// Encode as base64.
|
||||
snapshot = base64.StdEncoding.EncodeToString(content)
|
||||
}
|
||||
return snapshot
|
||||
}
|
||||
|
||||
// ReadUserConfig Reads the user configuration of the Kerberos Open Source instance.
|
||||
// This will return a models.User struct including the username, password,
|
||||
// selected language, and if the installation was completed or not.
|
||||
@@ -146,6 +126,9 @@ func OpenConfig(configuration *models.Configuration) {
|
||||
conjungo.Merge(&s3, configuration.CustomConfig.S3, opts)
|
||||
configuration.Config.S3 = &s3
|
||||
|
||||
// Cleanup
|
||||
opts = nil
|
||||
|
||||
} else if os.Getenv("DEPLOYMENT") == "" || os.Getenv("DEPLOYMENT") == "agent" {
|
||||
|
||||
// Local deployment means we do a stand-alone installation
|
||||
@@ -244,6 +227,15 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
|
||||
case "AGENT_CAPTURE_CONTINUOUS":
|
||||
configuration.Config.Capture.Continuous = value
|
||||
break
|
||||
case "AGENT_CAPTURE_LIVEVIEW":
|
||||
configuration.Config.Capture.Liveview = value
|
||||
break
|
||||
case "AGENT_CAPTURE_MOTION":
|
||||
configuration.Config.Capture.Motion = value
|
||||
break
|
||||
case "AGENT_CAPTURE_SNAPSHOTS":
|
||||
configuration.Config.Capture.Snapshots = value
|
||||
break
|
||||
case "AGENT_CAPTURE_PRERECORDING":
|
||||
duration, err := strconv.ParseInt(value, 10, 64)
|
||||
if err == nil {
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package components
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/kerberos-io/joy4/cgo/ffmpeg"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/capture"
|
||||
"github.com/kerberos-io/agent/machinery/src/cloud"
|
||||
"github.com/kerberos-io/agent/machinery/src/computervision"
|
||||
@@ -15,7 +18,6 @@ import (
|
||||
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
|
||||
"github.com/kerberos-io/joy4/av"
|
||||
"github.com/kerberos-io/joy4/av/pubsub"
|
||||
"github.com/kerberos-io/joy4/cgo/ffmpeg"
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
@@ -57,12 +59,17 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
|
||||
// do several checks to see if the agent is still operational.
|
||||
go ControlAgent(communication)
|
||||
|
||||
// Create some global variables
|
||||
decoder := &ffmpeg.VideoDecoder{}
|
||||
subDecoder := &ffmpeg.VideoDecoder{}
|
||||
cameraSettings := &models.Camera{}
|
||||
|
||||
// Run the agent and fire up all the other
|
||||
// goroutines which do image capture, motion detection, onvif, etc.
|
||||
|
||||
for {
|
||||
// This will blocking until receiving a signal to be restarted, reconfigured, stopped, etc.
|
||||
status := RunAgent(configuration, communication, uptimeStart)
|
||||
status := RunAgent(configuration, communication, uptimeStart, cameraSettings, decoder, subDecoder)
|
||||
if status == "stop" {
|
||||
break
|
||||
}
|
||||
@@ -72,7 +79,7 @@ func Bootstrap(configuration *models.Configuration, communication *models.Commun
|
||||
log.Log.Debug("Bootstrap: finished")
|
||||
}
|
||||
|
||||
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time) string {
|
||||
func RunAgent(configuration *models.Configuration, communication *models.Communication, uptimeStart time.Time, cameraSettings *models.Camera, decoder *ffmpeg.VideoDecoder, subDecoder *ffmpeg.VideoDecoder) string {
|
||||
log.Log.Debug("RunAgent: started")
|
||||
|
||||
config := configuration.Config
|
||||
@@ -84,6 +91,10 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
infile, streams, err := capture.OpenRTSP(rtspUrl)
|
||||
|
||||
var queue *pubsub.Queue
|
||||
var subQueue *pubsub.Queue
|
||||
|
||||
var decoderMutex sync.Mutex
|
||||
var subDecoderMutex sync.Mutex
|
||||
|
||||
status := "not started"
|
||||
|
||||
@@ -104,15 +115,42 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
}
|
||||
}
|
||||
|
||||
// At some routines we will need to decode the image.
|
||||
// Make sure its properly locked as we only have a single decoder.
|
||||
var decoderMutex sync.Mutex
|
||||
var subDecoderMutex sync.Mutex
|
||||
decoder := capture.GetVideoDecoder(streams)
|
||||
// We will initialise the camera settings object
|
||||
// so we can check if the camera settings have changed, and we need
|
||||
// to reload the decoders.
|
||||
videoStream, _ := capture.GetVideoStream(streams)
|
||||
num, denum := videoStream.(av.VideoCodecData).Framerate()
|
||||
width := videoStream.(av.VideoCodecData).Width()
|
||||
height := videoStream.(av.VideoCodecData).Height()
|
||||
|
||||
var subDecoder *ffmpeg.VideoDecoder
|
||||
if subStreamEnabled {
|
||||
subDecoder = capture.GetVideoDecoder(subStreams)
|
||||
if cameraSettings.RTSP != rtspUrl || cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height || cameraSettings.Num != num || cameraSettings.Denum != denum || cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
|
||||
|
||||
if cameraSettings.Initialized {
|
||||
decoder.Close()
|
||||
if subStreamEnabled {
|
||||
subDecoder.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// At some routines we will need to decode the image.
|
||||
// Make sure its properly locked as we only have a single decoder.
|
||||
log.Log.Info("RunAgent: camera settings changed, reloading decoder")
|
||||
capture.GetVideoDecoder(decoder, streams)
|
||||
if subStreamEnabled {
|
||||
capture.GetVideoDecoder(subDecoder, subStreams)
|
||||
}
|
||||
|
||||
cameraSettings.RTSP = rtspUrl
|
||||
cameraSettings.SubRTSP = subRtspUrl
|
||||
cameraSettings.Width = width
|
||||
cameraSettings.Height = height
|
||||
cameraSettings.Framerate = float64(num) / float64(denum)
|
||||
cameraSettings.Num = num
|
||||
cameraSettings.Denum = denum
|
||||
cameraSettings.Codec = videoStream.(av.VideoCodecData).Type()
|
||||
cameraSettings.Initialized = true
|
||||
} else {
|
||||
log.Log.Info("RunAgent: camera settings did not change, keeping decoder")
|
||||
}
|
||||
|
||||
communication.Decoder = decoder
|
||||
@@ -131,15 +169,15 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
// processed by the different consumers: motion detection, recording, etc.
|
||||
queue = pubsub.NewQueue()
|
||||
communication.Queue = queue
|
||||
queue.SetMaxGopCount(int(config.Capture.PreRecording)) // GOP time frame is set to prerecording.
|
||||
log.Log.Info("RunAgent: SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)))
|
||||
queue.SetMaxGopCount(int(config.Capture.PreRecording) + 1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
|
||||
log.Log.Info("RunAgent: SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)+1))
|
||||
queue.WriteHeader(streams)
|
||||
|
||||
// We might have a substream, if so we'll create a seperate queue.
|
||||
var subQueue *pubsub.Queue
|
||||
if subStreamEnabled {
|
||||
log.Log.Info("RunAgent: Creating sub stream queue with SetMaxGopCount set to " + strconv.Itoa(int(config.Capture.PreRecording)))
|
||||
log.Log.Info("RunAgent: Creating sub stream queue with SetMaxGopCount set to " + strconv.Itoa(int(1)))
|
||||
subQueue = pubsub.NewQueue()
|
||||
communication.SubQueue = subQueue
|
||||
subQueue.SetMaxGopCount(1)
|
||||
subQueue.WriteHeader(subStreams)
|
||||
}
|
||||
@@ -214,22 +252,30 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
time.Sleep(time.Second * 1)
|
||||
|
||||
infile.Close()
|
||||
infile = nil
|
||||
queue.Close()
|
||||
queue = nil
|
||||
communication.Queue = nil
|
||||
if subStreamEnabled {
|
||||
subInfile.Close()
|
||||
subInfile = nil
|
||||
subQueue.Close()
|
||||
subQueue = nil
|
||||
communication.SubQueue = nil
|
||||
}
|
||||
close(communication.HandleONVIF)
|
||||
communication.HandleONVIF = nil
|
||||
close(communication.HandleLiveHDHandshake)
|
||||
communication.HandleLiveHDHandshake = nil
|
||||
close(communication.HandleMotion)
|
||||
routers.DisconnectMQTT(mqttClient)
|
||||
communication.HandleMotion = nil
|
||||
|
||||
// Disconnect MQTT
|
||||
routers.DisconnectMQTT(mqttClient, &configuration.Config)
|
||||
|
||||
// Wait a few seconds to stop the decoder.
|
||||
time.Sleep(time.Second * 3)
|
||||
decoder.Close()
|
||||
if subStreamEnabled {
|
||||
subDecoder.Close()
|
||||
}
|
||||
|
||||
// Waiting for some seconds to make sure everything is properly closed.
|
||||
log.Log.Info("RunAgent: waiting 3 seconds to make sure everything is properly closed.")
|
||||
time.Sleep(time.Second * 3)
|
||||
@@ -240,6 +286,9 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi
|
||||
|
||||
log.Log.Debug("RunAgent: finished")
|
||||
|
||||
// Clean up, force garbage collection
|
||||
runtime.GC()
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
|
||||
@@ -3,12 +3,9 @@ package computervision
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"image"
|
||||
"image/jpeg"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -39,12 +36,7 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
pixelThreshold = 150
|
||||
}
|
||||
|
||||
if config.Capture.Recording == "false" {
|
||||
|
||||
// We might later add the option to still detect motion, but not record.
|
||||
log.Log.Info("ProcessMotion: Recording disabled, so we do not need motion detection either.")
|
||||
|
||||
} else if config.Capture.Continuous == "true" {
|
||||
if config.Capture.Continuous == "true" {
|
||||
|
||||
log.Log.Info("ProcessMotion: Continuous recording, so no motion detection.")
|
||||
|
||||
@@ -54,6 +46,9 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
|
||||
key := config.HubKey
|
||||
|
||||
// Allocate a VideoFrame
|
||||
frame := ffmpeg.AllocVideoFrame()
|
||||
|
||||
// Initialise first 2 elements
|
||||
var imageArray [3]*image.Gray
|
||||
|
||||
@@ -66,7 +61,7 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
pkt, cursorError = motionCursor.ReadPacket()
|
||||
// Check If valid package.
|
||||
if len(pkt.Data) > 0 && pkt.IsKeyFrame {
|
||||
grayImage, err := GetGrayImage(pkt, decoder, decoderMutex)
|
||||
grayImage, err := GetGrayImage(frame, pkt, decoder, decoderMutex)
|
||||
if err == nil {
|
||||
imageArray[j] = grayImage
|
||||
j++
|
||||
@@ -125,32 +120,14 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
continue
|
||||
}
|
||||
|
||||
grayImage, err := GetGrayImage(pkt, decoder, decoderMutex)
|
||||
grayImage, err := GetGrayImage(frame, pkt, decoder, decoderMutex)
|
||||
if err == nil {
|
||||
imageArray[2] = grayImage
|
||||
}
|
||||
|
||||
// Store snapshots (jpg) for hull.
|
||||
files, err := ioutil.ReadDir("./data/snapshots")
|
||||
if err == nil {
|
||||
rgbImage, err := GetRawImage(pkt, decoder, decoderMutex)
|
||||
if err == nil {
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
return files[i].ModTime().Before(files[j].ModTime())
|
||||
})
|
||||
if len(files) > 3 {
|
||||
os.Remove("./data/snapshots/" + files[0].Name())
|
||||
}
|
||||
|
||||
// Save image
|
||||
t := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
f, err := os.Create("./data/snapshots/" + t + ".jpg")
|
||||
if err == nil {
|
||||
jpeg.Encode(f, &rgbImage.Image, &jpeg.Options{Quality: 15})
|
||||
f.Close()
|
||||
}
|
||||
}
|
||||
rgbImage.Free()
|
||||
if config.Capture.Snapshots != "false" {
|
||||
StoreSnapshot(communication, frame, pkt, decoder, decoderMutex)
|
||||
}
|
||||
|
||||
// Check if within time interval
|
||||
@@ -176,32 +153,37 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
|
||||
}
|
||||
}
|
||||
|
||||
// Remember additional information about the result of findmotion
|
||||
isPixelChangeThresholdReached, changesToReturn = FindMotion(imageArray, coordinatesToCheck, pixelThreshold)
|
||||
if config.Capture.Motion != "false" {
|
||||
|
||||
if detectMotion && isPixelChangeThresholdReached {
|
||||
// Remember additional information about the result of findmotion
|
||||
isPixelChangeThresholdReached, changesToReturn = FindMotion(imageArray, coordinatesToCheck, pixelThreshold)
|
||||
if detectMotion && isPixelChangeThresholdReached {
|
||||
|
||||
if mqttClient != nil {
|
||||
mqttClient.Publish("kerberos/"+key+"/device/"+config.Key+"/motion", 2, false, "motion")
|
||||
if mqttClient != nil {
|
||||
mqttClient.Publish("kerberos/"+key+"/device/"+config.Key+"/motion", 2, false, "motion")
|
||||
}
|
||||
|
||||
if config.Capture.Recording != "false" {
|
||||
dataToPass := models.MotionDataPartial{
|
||||
Timestamp: time.Now().Unix(),
|
||||
NumberOfChanges: changesToReturn,
|
||||
}
|
||||
communication.HandleMotion <- dataToPass //Save data to the channel
|
||||
}
|
||||
}
|
||||
|
||||
//FIXME: In the future MotionDataPartial should be replaced with MotionDataFull
|
||||
dataToPass := models.MotionDataPartial{
|
||||
Timestamp: time.Now().Unix(),
|
||||
NumberOfChanges: changesToReturn,
|
||||
}
|
||||
communication.HandleMotion <- dataToPass //Save data to the channel
|
||||
imageArray[0] = imageArray[1]
|
||||
imageArray[1] = imageArray[2]
|
||||
i++
|
||||
}
|
||||
|
||||
imageArray[0] = imageArray[1]
|
||||
imageArray[1] = imageArray[2]
|
||||
i++
|
||||
}
|
||||
|
||||
if img != nil {
|
||||
img = nil
|
||||
}
|
||||
}
|
||||
|
||||
frame.Free()
|
||||
}
|
||||
|
||||
log.Log.Debug("ProcessMotion: finished")
|
||||
@@ -216,30 +198,25 @@ func FindMotion(imageArray [3]*image.Gray, coordinatesToCheck []int, pixelChange
|
||||
return changes > pixelChangeThreshold, changes
|
||||
}
|
||||
|
||||
func GetGrayImage(pkt av.Packet, dec *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*image.Gray, error) {
|
||||
img, err := capture.DecodeImage(pkt, dec, decoderMutex)
|
||||
func GetGrayImage(frame *ffmpeg.VideoFrame, pkt av.Packet, dec *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*image.Gray, error) {
|
||||
_, err := capture.DecodeImage(frame, pkt, dec, decoderMutex)
|
||||
|
||||
// Do a deep copy of the image
|
||||
imgDeepCopy := image.NewGray(img.ImageGray.Bounds())
|
||||
imgDeepCopy.Stride = img.ImageGray.Stride
|
||||
copy(imgDeepCopy.Pix, img.ImageGray.Pix)
|
||||
|
||||
// Cleanup of underlaying data
|
||||
img.Free()
|
||||
imgDeepCopy := image.NewGray(frame.ImageGray.Bounds())
|
||||
imgDeepCopy.Stride = frame.ImageGray.Stride
|
||||
copy(imgDeepCopy.Pix, frame.ImageGray.Pix)
|
||||
|
||||
return imgDeepCopy, err
|
||||
}
|
||||
|
||||
func GetRawImage(pkt av.Packet, dec *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
|
||||
img, err := capture.DecodeImage(pkt, dec, decoderMutex)
|
||||
// We'll need to free up ourselves ;) using -> img.Free()
|
||||
return img, err
|
||||
func GetRawImage(frame *ffmpeg.VideoFrame, pkt av.Packet, dec *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
|
||||
_, err := capture.DecodeImage(frame, pkt, dec, decoderMutex)
|
||||
return frame, err
|
||||
}
|
||||
|
||||
func ImageToBytes(img image.Image) ([]byte, error) {
|
||||
buffer := new(bytes.Buffer)
|
||||
w := bufio.NewWriter(buffer)
|
||||
//err := jpeg.Encode(w, img, &jpeg.EncoderOptions{Quality: 70})
|
||||
err := jpeg.Encode(w, img, &jpeg.Options{Quality: 15})
|
||||
return buffer.Bytes(), err
|
||||
}
|
||||
@@ -256,3 +233,16 @@ func AbsDiffBitwiseAndThreshold(img1 *image.Gray, img2 *image.Gray, img3 *image.
|
||||
}
|
||||
return changes
|
||||
}
|
||||
|
||||
func StoreSnapshot(communication *models.Communication, frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
|
||||
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
|
||||
if err == nil {
|
||||
buffer := new(bytes.Buffer)
|
||||
w := bufio.NewWriter(buffer)
|
||||
err := jpeg.Encode(w, &rgbImage.Image, &jpeg.Options{Quality: 15})
|
||||
if err == nil {
|
||||
snapshot := base64.StdEncoding.EncodeToString(buffer.Bytes())
|
||||
communication.Image = snapshot
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
15
machinery/src/models/Camera.go
Normal file
15
machinery/src/models/Camera.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package models
|
||||
|
||||
import "github.com/kerberos-io/joy4/av"
|
||||
|
||||
type Camera struct {
|
||||
Width int
|
||||
Height int
|
||||
Num int
|
||||
Denum int
|
||||
Framerate float64
|
||||
RTSP string
|
||||
SubRTSP string
|
||||
Codec av.CodecType
|
||||
Initialized bool
|
||||
}
|
||||
@@ -28,8 +28,10 @@ type Communication struct {
|
||||
HandleONVIF chan OnvifAction
|
||||
IsConfiguring *abool.AtomicBool
|
||||
Queue *pubsub.Queue
|
||||
SubQueue *pubsub.Queue
|
||||
DecoderMutex *sync.Mutex
|
||||
SubDecoderMutex *sync.Mutex
|
||||
Decoder *ffmpeg.VideoDecoder
|
||||
SubDecoder *ffmpeg.VideoDecoder
|
||||
Image string
|
||||
}
|
||||
|
||||
@@ -51,6 +51,9 @@ type Capture struct {
|
||||
RaspiCamera RaspiCamera `json:"raspicamera"`
|
||||
Recording string `json:"recording,omitempty"`
|
||||
Continuous string `json:"continuous,omitempty"`
|
||||
Snapshots string `json:"snapshots,omitempty"`
|
||||
Motion string `json:"motion,omitempty"`
|
||||
Liveview string `json:"liveview,omitempty"`
|
||||
PostRecording int64 `json:"postrecording"`
|
||||
PreRecording int64 `json:"prerecording"`
|
||||
MaxLengthRecording int64 `json:"maxlengthrecording"`
|
||||
|
||||
@@ -1,16 +1,17 @@
|
||||
package models
|
||||
|
||||
type System struct {
|
||||
CPUId string `json:"cpu_idle" bson:"cpu_idle"`
|
||||
Hostname string `json:"hostname" bson:"hostname"`
|
||||
Version string `json:"version" bson:"version"`
|
||||
Release string `json:"release" bson:"release"`
|
||||
BootTime uint64 `json:"boot_time" bson:"boot_time"`
|
||||
KernelVersion string `json:"kernel_version" bson:"kernel_version"`
|
||||
MACs []string `json:"macs" bson:"macs"`
|
||||
IPs []string `json:"ips" bson:"ips"`
|
||||
Architecture string `json:"architecture" bson:"architecture"`
|
||||
UsedMemory uint64 `json:"used_memory" bson:"used_memory"`
|
||||
TotalMemory uint64 `json:"total_memory" bson:"total_memory"`
|
||||
FreeMemory uint64 `json:"free_memory" bson:"free_memory"`
|
||||
CPUId string `json:"cpu_idle" bson:"cpu_idle"`
|
||||
Hostname string `json:"hostname" bson:"hostname"`
|
||||
Version string `json:"version" bson:"version"`
|
||||
Release string `json:"release" bson:"release"`
|
||||
BootTime uint64 `json:"boot_time" bson:"boot_time"`
|
||||
KernelVersion string `json:"kernel_version" bson:"kernel_version"`
|
||||
MACs []string `json:"macs" bson:"macs"`
|
||||
IPs []string `json:"ips" bson:"ips"`
|
||||
Architecture string `json:"architecture" bson:"architecture"`
|
||||
UsedMemory uint64 `json:"used_memory" bson:"used_memory"`
|
||||
TotalMemory uint64 `json:"total_memory" bson:"total_memory"`
|
||||
FreeMemory uint64 `json:"free_memory" bson:"free_memory"`
|
||||
ProcessUsedMemory uint64 `json:"process_used_memory" bson:"process_used_memory"`
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configuratio
|
||||
"config": configuration.Config,
|
||||
"custom": configuration.CustomConfig,
|
||||
"global": configuration.GlobalConfig,
|
||||
"snapshot": components.GetSnapshot(),
|
||||
"snapshot": communication.Image,
|
||||
})
|
||||
})
|
||||
|
||||
@@ -157,7 +157,7 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configuratio
|
||||
"config": configuration.Config,
|
||||
"custom": configuration.CustomConfig,
|
||||
"global": configuration.GlobalConfig,
|
||||
"snapshot": components.GetSnapshot(),
|
||||
"snapshot": communication.Image,
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@@ -193,8 +193,16 @@ func MQTTListenerHandleONVIF(mqttClient mqtt.Client, hubKey string, configuratio
|
||||
})
|
||||
}
|
||||
|
||||
func DisconnectMQTT(mqttClient mqtt.Client) {
|
||||
func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) {
|
||||
if mqttClient != nil {
|
||||
// Cleanup all subscriptions.
|
||||
mqttClient.Unsubscribe("kerberos/" + config.HubKey + "/device/" + config.Key + "/request-live")
|
||||
mqttClient.Unsubscribe(config.Key + "/register")
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + config.Key)
|
||||
mqttClient.Unsubscribe("kerberos/webrtc/peers/" + config.Key)
|
||||
mqttClient.Unsubscribe("candidate/cloud")
|
||||
mqttClient.Unsubscribe("kerberos/onvif/" + config.Key)
|
||||
mqttClient.Disconnect(1000)
|
||||
mqttClient = nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/kerberos-io/agent/machinery/src/computervision"
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
"github.com/kerberos-io/joy4/cgo/ffmpeg"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
@@ -129,22 +130,23 @@ func ForwardSDStream(ctx context.Context, clientID string, connection *Connectio
|
||||
decoder := communication.Decoder
|
||||
decoderMutex := communication.DecoderMutex
|
||||
|
||||
// Allocate ffmpeg.VideoFrame
|
||||
frame := ffmpeg.AllocVideoFrame()
|
||||
|
||||
logreader:
|
||||
for {
|
||||
var encodedImage string
|
||||
if cursor != nil && decoder != nil {
|
||||
if queue != nil && cursor != nil && decoder != nil {
|
||||
pkt, err := cursor.ReadPacket()
|
||||
if err == nil {
|
||||
if !pkt.IsKeyFrame {
|
||||
continue
|
||||
}
|
||||
img, err := computervision.GetRawImage(pkt, decoder, decoderMutex)
|
||||
img, err := computervision.GetRawImage(frame, pkt, decoder, decoderMutex)
|
||||
if err == nil {
|
||||
bytes, _ := computervision.ImageToBytes(&img.Image)
|
||||
encodedImage = base64.StdEncoding.EncodeToString(bytes)
|
||||
}
|
||||
// Cleanup the image.
|
||||
img.Free()
|
||||
} else {
|
||||
log.Log.Error("ForwardSDStream:" + err.Error())
|
||||
break logreader
|
||||
@@ -169,5 +171,8 @@ logreader:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
frame.Free()
|
||||
|
||||
log.Log.Info("ForwardSDStream: stop sending streaming over websocket")
|
||||
}
|
||||
|
||||
@@ -253,7 +253,7 @@ func CreateFragmentedMP4(fullName string, fragmentedDuration int64) {
|
||||
path, _ := os.Getwd()
|
||||
duration := fragmentedDuration * 1000
|
||||
// This timescale is crucial, as it should be the same as the one defined in JOY4.
|
||||
cmd := exec.Command("mp4fragment", "--timescale", "10000", "--fragment-duration", strconv.FormatInt(duration, 10), fullName, fullName+"f.mp4")
|
||||
cmd := exec.Command("mp4fragment", "--timescale", "10000000", "--fragment-duration", strconv.FormatInt(duration, 10), fullName, fullName+"f.mp4")
|
||||
cmd.Dir = path
|
||||
log.Log.Info(cmd.String())
|
||||
var out bytes.Buffer
|
||||
@@ -271,108 +271,3 @@ func CreateFragmentedMP4(fullName string, fragmentedDuration int64) {
|
||||
os.Remove(fullName)
|
||||
os.Rename(fullName+"f.mp4", fullName)
|
||||
}
|
||||
|
||||
/*func FloatToString(input_num float64) string {
|
||||
// to convert a float number to a string
|
||||
return strconv.FormatFloat(input_num, 'f', 6, 64)
|
||||
}
|
||||
|
||||
func ParseFMP4(file *bytes.Reader) (error, uint64, uint64, []*mp4.MediaSegment, uint64) {
|
||||
var ftypSize, moovSize uint64
|
||||
var segments []*mp4.MediaSegment
|
||||
var duration uint64
|
||||
parsedMp4, err := mp4.DecodeFile(file, mp4.WithDecodeMode(mp4.DecModeLazyMdat))
|
||||
if parsedMp4 != nil && parsedMp4.Init != nil && err == nil {
|
||||
ftypSize = parsedMp4.Init.Ftyp.Size()
|
||||
moovSize = parsedMp4.Init.Moov.Size()
|
||||
duration = parsedMp4.Moov.Trak.Tkhd.Duration
|
||||
segments = parsedMp4.Segments
|
||||
}
|
||||
return err, ftypSize, moovSize, segments, duration
|
||||
}
|
||||
|
||||
func ParseFMP4Detail(fullName string) {
|
||||
|
||||
// open fullName
|
||||
file, err := os.Open(fullName)
|
||||
if err != nil {
|
||||
log.Log.Error("Error opening file: " + err.Error())
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
fileBytes, err := ioutil.ReadAll(file)
|
||||
if err != nil {
|
||||
log.Log.Error("Error reading file: " + err.Error())
|
||||
}
|
||||
fileReader := bytes.NewReader(fileBytes)
|
||||
|
||||
err, ftypSize, moovSize, segments, dur := ParseFMP4(fileReader)
|
||||
fmt.Println("========== Fragmented details =================")
|
||||
fmt.Println(dur)
|
||||
fmt.Println(ftypSize)
|
||||
fmt.Println(moovSize)
|
||||
fmt.Println(len(segments))
|
||||
|
||||
// Calculate duration of segments
|
||||
var totalDuration uint64
|
||||
for _, segment := range segments {
|
||||
fragments := segment.Fragments
|
||||
|
||||
for _, fragment := range fragments {
|
||||
var sampleDuration uint32
|
||||
samples := fragment.Moof.Traf.Trun.Samples
|
||||
for _, sample := range samples {
|
||||
sampleDuration += sample.Dur
|
||||
}
|
||||
fmt.Println(sampleDuration)
|
||||
totalDuration += uint64(sampleDuration)
|
||||
}
|
||||
}
|
||||
|
||||
misingDuration := dur - totalDuration/100
|
||||
fmt.Println(misingDuration)
|
||||
fmt.Println(totalDuration/100 + misingDuration)
|
||||
}
|
||||
|
||||
func CreateFragmentByteRanges(log log.Logging, fileName string, ftypSize uint64, moovSize uint64, segments []*mp4.MediaSegment) (string, []models.BytesRangeOnTime) {
|
||||
size := strconv.FormatInt(int64(ftypSize+moovSize), 10)
|
||||
|
||||
var fileFragments strings.Builder
|
||||
fileFragments.WriteString("#EXT-X-MAP:URI=\"" + fileName + "\",BYTERANGE=\"" + size + "@0\"\n")
|
||||
|
||||
var bytesRangeOnTime []models.BytesRangeOnTime
|
||||
var currentTime uint32
|
||||
for _, segment := range segments {
|
||||
fragments := segment.Fragments
|
||||
for _, fragment := range fragments {
|
||||
var sampleDuration uint32
|
||||
samples := fragment.Moof.Traf.Trun.Samples
|
||||
for _, sample := range samples {
|
||||
sampleDuration += sample.Dur
|
||||
}
|
||||
currentTime = currentTime + sampleDuration
|
||||
duration := FloatToString(float64(sampleDuration / 100000))
|
||||
|
||||
startPos := fragment.Moof.StartPos
|
||||
start := strconv.FormatInt(int64(startPos), 10)
|
||||
|
||||
totalSize := fragment.Mdat.Size() + fragment.Moof.Size()
|
||||
size := strconv.FormatInt(int64(totalSize), 10)
|
||||
|
||||
fileFragments.WriteString("#EXTINF:" + duration + ",\n") // @TODO calculate the duration
|
||||
fileFragments.WriteString("#EXT-X-BYTERANGE:" + size + "@" + start + "\n")
|
||||
fileFragments.WriteString(fileName + "\n")
|
||||
|
||||
byteRange := models.BytesRangeOnTime{
|
||||
Duration: duration,
|
||||
Time: FloatToString(float64(currentTime) / 100000),
|
||||
Range: size + "@" + start,
|
||||
}
|
||||
bytesRangeOnTime = append(bytesRangeOnTime, byteRange)
|
||||
}
|
||||
}
|
||||
|
||||
bytesRanges := fileFragments.String()
|
||||
log.Info("Fragments calculate from " + fileName + ": " + bytesRanges)
|
||||
return bytesRanges, bytesRangeOnTime
|
||||
}*/
|
||||
|
||||
@@ -5,8 +5,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -152,8 +150,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
if err := peerConnection.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
runtime.GC()
|
||||
debug.FreeOSMemory()
|
||||
} else if connectionState == pionWebRTC.ICEConnectionStateConnected {
|
||||
atomic.AddInt64(&peerConnectionCount, 1)
|
||||
} else if connectionState == pionWebRTC.ICEConnectionStateChecking {
|
||||
@@ -316,7 +312,8 @@ func WriteToTrack(livestreamCursor *pubsub.QueueCursor, configuration *models.Co
|
||||
}
|
||||
|
||||
if config.Capture.TranscodingWebRTC == "true" {
|
||||
decoderMutex.Lock()
|
||||
|
||||
/*decoderMutex.Lock()
|
||||
decoder.SetFramerate(30, 1)
|
||||
frame, err := decoder.Decode(pkt.Data)
|
||||
decoderMutex.Unlock()
|
||||
@@ -332,10 +329,8 @@ func WriteToTrack(livestreamCursor *pubsub.QueueCursor, configuration *models.Co
|
||||
pkt = _outpkts[0]
|
||||
codecData, _ = encoder.CodecData()
|
||||
}
|
||||
}
|
||||
if frame != nil {
|
||||
frame.Free()
|
||||
}
|
||||
}*/
|
||||
|
||||
}
|
||||
|
||||
switch int(pkt.Idx) {
|
||||
|
||||
Reference in New Issue
Block a user