Compare commits

..

8 Commits

Author SHA1 Message Date
Cedric Verstraeten
aac2150a3a [release] v3.1.1 2024-01-07 22:14:44 +01:00
Cedric Verstraeten
9b713637b9 change version number of ui 2024-01-07 21:44:32 +01:00
Cedric Verstraeten
699660d472 only make release when putting [release] 2024-01-07 21:41:32 +01:00
Cedric Verstraeten
751aa17534 feature: make hub encryption configurable + only send heartbeat to vault when credentials are set 2024-01-07 21:30:57 +01:00
Cedric Verstraeten
2681bd2fe3 hot fix: keep track of main and sub stream separately (one of them might block) 2024-01-07 20:20:51 +01:00
Cedric Verstraeten
93adb3dabc different order in action 2024-01-07 08:29:53 +01:00
Cedric Verstraeten
0e15e58a88 try once more different format 2024-01-07 08:26:34 +01:00
Cedric Verstraeten
ef2ea999df only run release to docker when containing [release] 2024-01-07 08:22:24 +01:00
14 changed files with 106 additions and 19 deletions

View File

@@ -6,6 +6,8 @@ on:
jobs:
build-amd64:
# If contains the keyword "#release" in the commit message.
if: ${{ !contains(github.event.head_commit.message, '#release') }}
runs-on: ubuntu-latest
strategy:
matrix:
@@ -31,6 +33,8 @@ jobs:
- name: Create new and append to latest manifest
run: docker buildx imagetools create -t kerberos/agent-dev:latest kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
build-other:
# If contains the keyword "#release" in the commit message.
if: ${{ !contains(github.event.head_commit.message, '#release') }}
runs-on: ubuntu-latest
strategy:
matrix:

View File

@@ -7,6 +7,8 @@ on:
jobs:
build-amd64:
# If contains the keyword "[release]" in the commit message.
if: "contains(github.event.head_commit.message, '[release]')"
runs-on: ubuntu-latest
strategy:
matrix:
@@ -30,6 +32,8 @@ jobs:
- name: Create new and append to manifest
run: cd agent && docker buildx imagetools create -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
build-other:
# If contains the keyword "[release]" in the commit message.
if: "contains(github.event.head_commit.message, '[release]')"
runs-on: ubuntu-latest
strategy:
matrix:

View File

@@ -2,6 +2,7 @@ name: Docker master build
on:
push:
# If pushed to master branch.
branches: [ master ]
env:
@@ -9,6 +10,8 @@ env:
jobs:
build-amd64:
# If contains the keyword "[release]" in the commit message.
if: "contains(github.event.head_commit.message, '[release]')"
runs-on: ubuntu-latest
permissions:
contents: write
@@ -67,6 +70,8 @@ jobs:
#- name: Use Snapcraft
# run: tar -xf agent-${{matrix.architecture}}.tar && snapcraft
build-other:
# If contains the keyword "[release]" in the commit message.
if: "contains(github.event.head_commit.message, '[release]')"
runs-on: ubuntu-latest
permissions:
contents: write

View File

@@ -226,6 +226,7 @@ Next to attaching the configuration file, it is also possible to override the co
| `AGENT_TURN_USERNAME` | TURN username used for WebRTC. | "username1" |
| `AGENT_TURN_PASSWORD` | TURN password used for WebRTC. | "password1" |
| `AGENT_CLOUD` | Store recordings in Kerberos Hub (s3), Kerberos Vault (kstorage) or Dropbox (dropbox). | "s3" |
| `AGENT_HUB_ENCRYPTION` | Turning on/off encryption of traffic from your Kerberos Agent to Kerberos Hub. | "true" |
| `AGENT_HUB_URI` | The Kerberos Hub API, defaults to our Kerberos Hub SAAS. | "https://api.hub.domain.com" |
| `AGENT_HUB_KEY` | The access key linked to your account in Kerberos Hub. | "" |
| `AGENT_HUB_PRIVATE_KEY` | The secret access key linked to your account in Kerberos Hub. | "" |

View File

@@ -107,6 +107,7 @@
"turn_username": "username1",
"turn_password": "password1",
"heartbeaturi": "",
"hub_encryption": "true",
"hub_uri": "https://api.cloud.kerberos.io",
"hub_key": "",
"hub_private_key": "",

View File

@@ -351,7 +351,7 @@ func (g *Golibrtsp) ConnectBackChannel(ctx context.Context) (err error) {
}
// Start the RTSP client, and start reading packets.
func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) {
func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) {
log.Log.Debug("capture.golibrtsp.Start(): started")
// called when a MULAW audio RTP packet arrives
@@ -527,10 +527,17 @@ func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configurati
if idrPresent {
// Increment packets, so we know the device
// is not blocking.
r := communication.PackageCounter.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounter.Store((r + 1) % 1000)
communication.LastPacketTimer.Store(time.Now().Unix())
if streamType == "main" {
r := communication.PackageCounter.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounter.Store((r + 1) % 1000)
communication.LastPacketTimer.Store(time.Now().Unix())
} else if streamType == "sub" {
r := communication.PackageCounterSub.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounterSub.Store((r + 1) % 1000)
communication.LastPacketTimerSub.Store(time.Now().Unix())
}
}
}
@@ -637,10 +644,17 @@ func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configurati
if isRandomAccess {
// Increment packets, so we know the device
// is not blocking.
r := communication.PackageCounter.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounter.Store((r + 1) % 1000)
communication.LastPacketTimer.Store(time.Now().Unix())
if streamType == "main" {
r := communication.PackageCounter.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounter.Store((r + 1) % 1000)
communication.LastPacketTimer.Store(time.Now().Unix())
} else if streamType == "sub" {
r := communication.PackageCounterSub.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounterSub.Store((r + 1) % 1000)
communication.LastPacketTimerSub.Store(time.Now().Unix())
}
}
}

View File

@@ -44,7 +44,7 @@ type RTSPClient interface {
ConnectBackChannel(ctx context.Context) error
// Start the RTSP client, and start reading packets.
Start(ctx context.Context, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) error
Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) error
// Start the RTSP client, and start reading packets.
StartBackChannel(ctx context.Context) (err error)

View File

@@ -324,7 +324,7 @@ loop:
// Try again
pullPointAddress, err = onvif.CreatePullPointSubscription(device)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
log.Log.Debug("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
}
@@ -450,8 +450,9 @@ loop:
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled, onvifZoom, onvifPanTilt, onvifPresets, onvifPresetsList, onvifEventsList, cameraConnected, hasBackChannel)
// Get the private key to encrypt the data using symmetric encryption: AES.
HubEncryption := config.HubEncryption
privateKey := config.HubPrivateKey
if privateKey != "" {
if HubEncryption == "true" && privateKey != "" {
// Encrypt the data using AES.
encrypted, err := encryption.AesEncrypt([]byte(object), privateKey)
if err != nil {
@@ -492,7 +493,9 @@ loop:
// If we have a Kerberos Vault connected, we will also send some analytics
// to that service.
vaultURI = config.KStorage.URI
if vaultURI != "" {
accessKey := config.KStorage.AccessKey
secretAccessKey := config.KStorage.SecretAccessKey
if vaultURI != "" && accessKey != "" && secretAccessKey != "" {
var object = fmt.Sprintf(`{
"key" : "%s",

View File

@@ -36,12 +36,20 @@ func Bootstrap(configDirectory string, configuration *models.Configuration, comm
packageCounter.Store(int64(0))
communication.PackageCounter = &packageCounter
var packageCounterSub atomic.Value
packageCounterSub.Store(int64(0))
communication.PackageCounterSub = &packageCounterSub
// This is used when the last packet was received (timestamp),
// this metric is used to determine if the camera is still online/connected.
var lastPacketTimer atomic.Value
packageCounter.Store(int64(0))
communication.LastPacketTimer = &lastPacketTimer
var lastPacketTimerSub atomic.Value
packageCounterSub.Store(int64(0))
communication.LastPacketTimerSub = &lastPacketTimerSub
// This is used to understand if we have a working Kerberos Hub connection
// cloudTimestamp will be updated when successfully sending heartbeats.
var cloudTimestamp atomic.Value
@@ -245,7 +253,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
log.Log.Info("components.Kerberos.RunAgent(): SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)+1))
queue.SetMaxGopCount(int(config.Capture.PreRecording) + 1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
queue.WriteHeader(videoStreams)
go rtspClient.Start(context.Background(), queue, configuration, communication)
go rtspClient.Start(context.Background(), "main", queue, configuration, communication)
// Main stream is connected and ready to go.
communication.MainStreamConnected = true
// Try to create backchannel
rtspBackChannelClient := captureDevice.SetBackChannelClient(rtspUrl)
@@ -261,7 +272,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
communication.SubQueue = subQueue
subQueue.SetMaxGopCount(1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
subQueue.WriteHeader(videoSubStreams)
go rtspSubClient.Start(context.Background(), subQueue, configuration, communication)
go rtspSubClient.Start(context.Background(), "sub", subQueue, configuration, communication)
// Sub stream is connected and ready to go.
communication.SubStreamConnected = true
}
// Handle livestream SD (low resolution over MQTT)
@@ -320,6 +334,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
// If we reach this point, we are stopping the stream.
communication.CameraConnected = false
communication.MainStreamConnected = false
communication.SubStreamConnected = false
// Cancel the main context, this will stop all the other goroutines.
(*communication.CancelContext)()
@@ -397,14 +413,19 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
func ControlAgent(communication *models.Communication) {
log.Log.Debug("components.Kerberos.ControlAgent(): started")
packageCounter := communication.PackageCounter
packageSubCounter := communication.PackageCounterSub
go func() {
// A channel to check the camera activity
var previousPacket int64 = 0
var previousPacketSub int64 = 0
var occurence = 0
var occurenceSub = 0
for {
// If camera is connected, we'll check if we are still receiving packets.
if communication.CameraConnected {
// First we'll check the main stream.
packetsR := packageCounter.Load().(int64)
if packetsR == previousPacket {
// If we are already reconfiguring,
@@ -416,16 +437,42 @@ func ControlAgent(communication *models.Communication) {
occurence = 0
}
log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read " + strconv.FormatInt(packetsR, 10))
log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read from mainstream: " + strconv.FormatInt(packetsR, 10))
// After 15 seconds without activity this is thrown..
if occurence == 3 {
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery.")
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking mainstream.")
communication.HandleBootstrap <- "restart"
time.Sleep(2 * time.Second)
occurence = 0
}
// Now we'll check the sub stream.
packetsSubR := packageSubCounter.Load().(int64)
if communication.SubStreamConnected {
if packetsSubR == previousPacketSub {
// If we are already reconfiguring,
// we dont need to check if the stream is blocking.
if !communication.IsConfiguring.IsSet() {
occurenceSub = occurenceSub + 1
}
} else {
occurenceSub = 0
}
log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read from substream: " + strconv.FormatInt(packetsSubR, 10))
// After 15 seconds without activity this is thrown..
if occurenceSub == 3 {
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking substream.")
communication.HandleBootstrap <- "restart"
time.Sleep(2 * time.Second)
occurenceSub = 0
}
}
previousPacket = packageCounter.Load().(int64)
previousPacketSub = packageSubCounter.Load().(int64)
}
time.Sleep(5 * time.Second)

View File

@@ -406,6 +406,9 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
break
/* When connected and storing in Kerberos Hub (SAAS) */
case "AGENT_HUB_ENCRYPTION":
configuration.Config.HubEncryption = value
break
case "AGENT_HUB_URI":
configuration.Config.HubURI = value
break

View File

@@ -15,6 +15,8 @@ type Communication struct {
CancelContext *context.CancelFunc
PackageCounter *atomic.Value
LastPacketTimer *atomic.Value
PackageCounterSub *atomic.Value
LastPacketTimerSub *atomic.Value
CloudTimestamp *atomic.Value
HandleBootstrap chan string
HandleStream chan string
@@ -33,5 +35,7 @@ type Communication struct {
SubQueue *packets.Queue
Image string
CameraConnected bool
MainStreamConnected bool
SubStreamConnected bool
HasBackChannel bool
}

View File

@@ -37,6 +37,7 @@ type Config struct {
TURNUsername string `json:"turn_username" bson:"turn_username"`
TURNPassword string `json:"turn_password" bson:"turn_password"`
HeartbeatURI string `json:"heartbeaturi" bson:"heartbeaturi"` /*obsolete*/
HubEncryption string `json:"hub_encryption" bson:"hub_encryption"`
HubURI string `json:"hub_uri" bson:"hub_uri"`
HubKey string `json:"hub_key" bson:"hub_key"`
HubPrivateKey string `json:"hub_private_key" bson:"hub_private_key"`

View File

@@ -992,7 +992,7 @@ func CreatePullPointSubscription(dev *onvif.Device) (string, error) {
stringBody := string(b2)
decodedXML, et, err := getXMLNode(stringBody, "CreatePullPointSubscriptionResponse")
if err != nil {
log.Log.Error("onvif.main.CreatePullPointSubscription(): " + err.Error())
log.Log.Debug("onvif.main.CreatePullPointSubscription(): " + err.Error())
} else {
if err := decodedXML.DecodeElement(&createPullPointSubscriptionResponse, et); err != nil {
log.Log.Error("onvif.main.CreatePullPointSubscription(): " + err.Error())

View File

@@ -100,7 +100,7 @@ class App extends React.Component {
</div>
)}
<div id="page-root">
<Sidebar logo={logo} title="Kerberos Agent" version="v1-beta" mobile>
<Sidebar logo={logo} title="Kerberos Agent" version="v3.1.1" mobile>
<Profilebar
username={username}
email="support@kerberos.io"