mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-03 21:50:21 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2681bd2fe3 | ||
|
|
93adb3dabc | ||
|
|
0e15e58a88 | ||
|
|
ef2ea999df |
4
.github/workflows/docker-dev.yml
vendored
4
.github/workflows/docker-dev.yml
vendored
@@ -6,6 +6,8 @@ on:
|
||||
|
||||
jobs:
|
||||
build-amd64:
|
||||
# If contains the keyword "#release" in the commit message.
|
||||
if: ${{ !contains(github.event.head_commit.message, '#release') }}
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -31,6 +33,8 @@ jobs:
|
||||
- name: Create new and append to latest manifest
|
||||
run: docker buildx imagetools create -t kerberos/agent-dev:latest kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
|
||||
build-other:
|
||||
# If contains the keyword "#release" in the commit message.
|
||||
if: ${{ !contains(github.event.head_commit.message, '#release') }}
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
|
||||
5
.github/workflows/docker.yml
vendored
5
.github/workflows/docker.yml
vendored
@@ -2,6 +2,7 @@ name: Docker master build
|
||||
|
||||
on:
|
||||
push:
|
||||
# If pushed to master branch.
|
||||
branches: [ master ]
|
||||
|
||||
env:
|
||||
@@ -9,6 +10,8 @@ env:
|
||||
|
||||
jobs:
|
||||
build-amd64:
|
||||
# If contains the keyword "#release" in the commit message.
|
||||
if: "!contains(github.event.head_commit.message, '#release')"
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
@@ -67,6 +70,8 @@ jobs:
|
||||
#- name: Use Snapcraft
|
||||
# run: tar -xf agent-${{matrix.architecture}}.tar && snapcraft
|
||||
build-other:
|
||||
# If contains the keyword "#release" in the commit message.
|
||||
if: ${{ !contains(github.event.head_commit.message, '#release') }}
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
@@ -351,7 +351,7 @@ func (g *Golibrtsp) ConnectBackChannel(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// Start the RTSP client, and start reading packets.
|
||||
func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) {
|
||||
func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) {
|
||||
log.Log.Debug("capture.golibrtsp.Start(): started")
|
||||
|
||||
// called when a MULAW audio RTP packet arrives
|
||||
@@ -527,10 +527,17 @@ func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configurati
|
||||
if idrPresent {
|
||||
// Increment packets, so we know the device
|
||||
// is not blocking.
|
||||
r := communication.PackageCounter.Load().(int64)
|
||||
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
|
||||
communication.PackageCounter.Store((r + 1) % 1000)
|
||||
communication.LastPacketTimer.Store(time.Now().Unix())
|
||||
if streamType == "main" {
|
||||
r := communication.PackageCounter.Load().(int64)
|
||||
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
|
||||
communication.PackageCounter.Store((r + 1) % 1000)
|
||||
communication.LastPacketTimer.Store(time.Now().Unix())
|
||||
} else if streamType == "sub" {
|
||||
r := communication.PackageCounterSub.Load().(int64)
|
||||
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
|
||||
communication.PackageCounterSub.Store((r + 1) % 1000)
|
||||
communication.LastPacketTimerSub.Store(time.Now().Unix())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -637,10 +644,17 @@ func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configurati
|
||||
if isRandomAccess {
|
||||
// Increment packets, so we know the device
|
||||
// is not blocking.
|
||||
r := communication.PackageCounter.Load().(int64)
|
||||
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
|
||||
communication.PackageCounter.Store((r + 1) % 1000)
|
||||
communication.LastPacketTimer.Store(time.Now().Unix())
|
||||
if streamType == "main" {
|
||||
r := communication.PackageCounter.Load().(int64)
|
||||
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
|
||||
communication.PackageCounter.Store((r + 1) % 1000)
|
||||
communication.LastPacketTimer.Store(time.Now().Unix())
|
||||
} else if streamType == "sub" {
|
||||
r := communication.PackageCounterSub.Load().(int64)
|
||||
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
|
||||
communication.PackageCounterSub.Store((r + 1) % 1000)
|
||||
communication.LastPacketTimerSub.Store(time.Now().Unix())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ type RTSPClient interface {
|
||||
ConnectBackChannel(ctx context.Context) error
|
||||
|
||||
// Start the RTSP client, and start reading packets.
|
||||
Start(ctx context.Context, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) error
|
||||
Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) error
|
||||
|
||||
// Start the RTSP client, and start reading packets.
|
||||
StartBackChannel(ctx context.Context) (err error)
|
||||
|
||||
@@ -324,7 +324,7 @@ loop:
|
||||
// Try again
|
||||
pullPointAddress, err = onvif.CreatePullPointSubscription(device)
|
||||
if err != nil {
|
||||
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
|
||||
log.Log.Debug("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -36,12 +36,20 @@ func Bootstrap(configDirectory string, configuration *models.Configuration, comm
|
||||
packageCounter.Store(int64(0))
|
||||
communication.PackageCounter = &packageCounter
|
||||
|
||||
var packageCounterSub atomic.Value
|
||||
packageCounterSub.Store(int64(0))
|
||||
communication.PackageCounterSub = &packageCounterSub
|
||||
|
||||
// This is used when the last packet was received (timestamp),
|
||||
// this metric is used to determine if the camera is still online/connected.
|
||||
var lastPacketTimer atomic.Value
|
||||
packageCounter.Store(int64(0))
|
||||
communication.LastPacketTimer = &lastPacketTimer
|
||||
|
||||
var lastPacketTimerSub atomic.Value
|
||||
packageCounterSub.Store(int64(0))
|
||||
communication.LastPacketTimerSub = &lastPacketTimerSub
|
||||
|
||||
// This is used to understand if we have a working Kerberos Hub connection
|
||||
// cloudTimestamp will be updated when successfully sending heartbeats.
|
||||
var cloudTimestamp atomic.Value
|
||||
@@ -245,7 +253,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
log.Log.Info("components.Kerberos.RunAgent(): SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)+1))
|
||||
queue.SetMaxGopCount(int(config.Capture.PreRecording) + 1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
|
||||
queue.WriteHeader(videoStreams)
|
||||
go rtspClient.Start(context.Background(), queue, configuration, communication)
|
||||
go rtspClient.Start(context.Background(), "main", queue, configuration, communication)
|
||||
|
||||
// Main stream is connected and ready to go.
|
||||
communication.MainStreamConnected = true
|
||||
|
||||
// Try to create backchannel
|
||||
rtspBackChannelClient := captureDevice.SetBackChannelClient(rtspUrl)
|
||||
@@ -261,7 +272,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
communication.SubQueue = subQueue
|
||||
subQueue.SetMaxGopCount(1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
|
||||
subQueue.WriteHeader(videoSubStreams)
|
||||
go rtspSubClient.Start(context.Background(), subQueue, configuration, communication)
|
||||
go rtspSubClient.Start(context.Background(), "sub", subQueue, configuration, communication)
|
||||
|
||||
// Sub stream is connected and ready to go.
|
||||
communication.SubStreamConnected = true
|
||||
}
|
||||
|
||||
// Handle livestream SD (low resolution over MQTT)
|
||||
@@ -320,6 +334,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
|
||||
// If we reach this point, we are stopping the stream.
|
||||
communication.CameraConnected = false
|
||||
communication.MainStreamConnected = false
|
||||
communication.SubStreamConnected = false
|
||||
|
||||
// Cancel the main context, this will stop all the other goroutines.
|
||||
(*communication.CancelContext)()
|
||||
@@ -397,14 +413,19 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
func ControlAgent(communication *models.Communication) {
|
||||
log.Log.Debug("components.Kerberos.ControlAgent(): started")
|
||||
packageCounter := communication.PackageCounter
|
||||
packageSubCounter := communication.PackageCounterSub
|
||||
go func() {
|
||||
// A channel to check the camera activity
|
||||
var previousPacket int64 = 0
|
||||
var previousPacketSub int64 = 0
|
||||
var occurence = 0
|
||||
var occurenceSub = 0
|
||||
for {
|
||||
|
||||
// If camera is connected, we'll check if we are still receiving packets.
|
||||
if communication.CameraConnected {
|
||||
|
||||
// First we'll check the main stream.
|
||||
packetsR := packageCounter.Load().(int64)
|
||||
if packetsR == previousPacket {
|
||||
// If we are already reconfiguring,
|
||||
@@ -416,16 +437,42 @@ func ControlAgent(communication *models.Communication) {
|
||||
occurence = 0
|
||||
}
|
||||
|
||||
log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read " + strconv.FormatInt(packetsR, 10))
|
||||
log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read from main stream: " + strconv.FormatInt(packetsR, 10))
|
||||
|
||||
// After 15 seconds without activity this is thrown..
|
||||
if occurence == 3 {
|
||||
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery.")
|
||||
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking main stream.")
|
||||
communication.HandleBootstrap <- "restart"
|
||||
time.Sleep(2 * time.Second)
|
||||
occurence = 0
|
||||
}
|
||||
|
||||
// Now we'll check the sub stream.
|
||||
packetsSubR := packageSubCounter.Load().(int64)
|
||||
if communication.SubStreamConnected {
|
||||
if packetsSubR == previousPacketSub {
|
||||
// If we are already reconfiguring,
|
||||
// we dont need to check if the stream is blocking.
|
||||
if !communication.IsConfiguring.IsSet() {
|
||||
occurenceSub = occurenceSub + 1
|
||||
}
|
||||
} else {
|
||||
occurenceSub = 0
|
||||
}
|
||||
|
||||
log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read from sub stream: " + strconv.FormatInt(packetsSubR, 10))
|
||||
|
||||
// After 15 seconds without activity this is thrown..
|
||||
if occurenceSub == 3 {
|
||||
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking sub stream.")
|
||||
communication.HandleBootstrap <- "restart"
|
||||
time.Sleep(2 * time.Second)
|
||||
occurenceSub = 0
|
||||
}
|
||||
}
|
||||
|
||||
previousPacket = packageCounter.Load().(int64)
|
||||
previousPacketSub = packageSubCounter.Load().(int64)
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
@@ -15,6 +15,8 @@ type Communication struct {
|
||||
CancelContext *context.CancelFunc
|
||||
PackageCounter *atomic.Value
|
||||
LastPacketTimer *atomic.Value
|
||||
PackageCounterSub *atomic.Value
|
||||
LastPacketTimerSub *atomic.Value
|
||||
CloudTimestamp *atomic.Value
|
||||
HandleBootstrap chan string
|
||||
HandleStream chan string
|
||||
@@ -33,5 +35,7 @@ type Communication struct {
|
||||
SubQueue *packets.Queue
|
||||
Image string
|
||||
CameraConnected bool
|
||||
MainStreamConnected bool
|
||||
SubStreamConnected bool
|
||||
HasBackChannel bool
|
||||
}
|
||||
|
||||
@@ -992,7 +992,7 @@ func CreatePullPointSubscription(dev *onvif.Device) (string, error) {
|
||||
stringBody := string(b2)
|
||||
decodedXML, et, err := getXMLNode(stringBody, "CreatePullPointSubscriptionResponse")
|
||||
if err != nil {
|
||||
log.Log.Error("onvif.main.CreatePullPointSubscription(): " + err.Error())
|
||||
log.Log.Debug("onvif.main.CreatePullPointSubscription(): " + err.Error())
|
||||
} else {
|
||||
if err := decodedXML.DecodeElement(&createPullPointSubscriptionResponse, et); err != nil {
|
||||
log.Log.Error("onvif.main.CreatePullPointSubscription(): " + err.Error())
|
||||
|
||||
Reference in New Issue
Block a user