mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-12 15:59:13 +00:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0131b87692 | ||
|
|
54e8198b65 | ||
|
|
3bfb68f950 | ||
|
|
c05e59c936 | ||
|
|
b42d63b668 | ||
|
|
0ca007e424 | ||
|
|
229d085de7 | ||
|
|
30e2b8318d | ||
|
|
dbcf4e242c | ||
|
|
ccf4034cc8 | ||
|
|
a34836e8f4 | ||
|
|
dd1464d1be | ||
|
|
2c02e0aeb1 | ||
|
|
d5464362bb | ||
|
|
5bcefd0015 | ||
|
|
5bb9def42d | ||
|
|
ff38ccbadf | ||
|
|
f64e899de9 |
51
.github/workflows/pr-build.yml
vendored
51
.github/workflows/pr-build.yml
vendored
@@ -7,61 +7,34 @@ env:
|
||||
REPO: kerberos/agent
|
||||
|
||||
jobs:
|
||||
build-amd64:
|
||||
runs-on: ubuntu-24.04
|
||||
build:
|
||||
runs-on: ${{ matrix.runner }}
|
||||
permissions:
|
||||
contents: write
|
||||
strategy:
|
||||
matrix:
|
||||
architecture: [amd64]
|
||||
include:
|
||||
- architecture: amd64
|
||||
runner: ubuntu-24.04
|
||||
dockerfile: Dockerfile
|
||||
- architecture: arm64
|
||||
runner: ubuntu-24.04-arm
|
||||
dockerfile: Dockerfile.arm64
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
- uses: benjlevesque/short-sha@v2.1
|
||||
id: short-sha
|
||||
with:
|
||||
length: 7
|
||||
- name: Run Build
|
||||
run: |
|
||||
docker build -t ${{matrix.architecture}} .
|
||||
CID=$(docker create ${{matrix.architecture}})
|
||||
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
|
||||
docker rm ${CID}
|
||||
- name: Strip binary
|
||||
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
|
||||
- name: Upload artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: agent-${{matrix.architecture}}.tar
|
||||
path: agent-${{matrix.architecture}}.tar
|
||||
|
||||
build-arm64:
|
||||
runs-on: ubuntu-24.04-arm
|
||||
permissions:
|
||||
contents: write
|
||||
strategy:
|
||||
matrix:
|
||||
architecture: [arm64]
|
||||
steps:
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
- uses: benjlevesque/short-sha@v2.1
|
||||
id: short-sha
|
||||
with:
|
||||
length: 7
|
||||
- name: Run Build
|
||||
run: |
|
||||
docker build -t ${{matrix.architecture}} -f Dockerfile.arm64 .
|
||||
docker build -t ${{ matrix.architecture }} -f ${{ matrix.dockerfile }} .
|
||||
CID=$(docker create ${{matrix.architecture}})
|
||||
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
|
||||
docker rm ${CID}
|
||||
|
||||
@@ -159,6 +159,19 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
|
||||
}
|
||||
|
||||
// Close mp4
|
||||
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
|
||||
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
|
||||
}
|
||||
if len(mp4Video.PPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.PPSNALUs) > 0 {
|
||||
mp4Video.PPSNALUs = configuration.Config.Capture.IPCamera.PPSNALUs
|
||||
}
|
||||
if len(mp4Video.VPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.VPSNALUs) > 0 {
|
||||
mp4Video.VPSNALUs = configuration.Config.Capture.IPCamera.VPSNALUs
|
||||
}
|
||||
if (videoCodec == "H264" && (len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) ||
|
||||
(videoCodec == "H265" && (len(mp4Video.VPSNALUs) == 0 || len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) {
|
||||
log.Log.Warning("capture.main.HandleRecordStream(continuous): closing MP4 without full parameter sets, moov may be incomplete")
|
||||
}
|
||||
mp4Video.Close(&config)
|
||||
log.Log.Info("capture.main.HandleRecordStream(continuous): recording finished: file save: " + name)
|
||||
|
||||
@@ -505,21 +518,8 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
|
||||
if len(spsNALUS) == 0 || len(ppsNALUS) == 0 {
|
||||
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): missing SPS/PPS at recording start")
|
||||
}
|
||||
// Create a video file, and set the dimensions.
|
||||
mp4Video := video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
|
||||
mp4Video.SetWidth(width)
|
||||
mp4Video.SetHeight(height)
|
||||
|
||||
if videoCodec == "H264" {
|
||||
videoTrack = mp4Video.AddVideoTrack("H264")
|
||||
} else if videoCodec == "H265" {
|
||||
videoTrack = mp4Video.AddVideoTrack("H265")
|
||||
}
|
||||
if audioCodec == "AAC" {
|
||||
audioTrack = mp4Video.AddAudioTrack("AAC")
|
||||
} else if audioCodec == "PCM_MULAW" {
|
||||
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
|
||||
}
|
||||
// Create the MP4 only once the first keyframe arrives.
|
||||
var mp4Video *video.MP4
|
||||
|
||||
for cursorError == nil {
|
||||
|
||||
@@ -538,7 +538,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
|
||||
default:
|
||||
}
|
||||
|
||||
if (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
|
||||
if start && (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
|
||||
log.Log.Info("capture.main.HandleRecordStream(motiondetection): timestamp+postRecording-now < 0 - " + strconv.FormatInt(motionTimestamp+postRecording-now, 10) + " < 0")
|
||||
log.Log.Info("capture.main.HandleRecordStream(motiondetection): now-startRecording > maxRecordingPeriod-500 - " + strconv.FormatInt(now-startRecording, 10) + " > " + strconv.FormatInt(maxRecordingPeriod-500, 10))
|
||||
log.Log.Info("capture.main.HandleRecordStream(motiondetection): closing recording (timestamp: " + strconv.FormatInt(motionTimestamp, 10) + ", postRecording: " + strconv.FormatInt(postRecording, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
|
||||
@@ -548,20 +548,44 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
|
||||
// We start the recording if we have a keyframe and the last duration is 0 or less than the current packet time.
|
||||
// It could be start we start from the beginning of the recording.
|
||||
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): write frames")
|
||||
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): recording started on keyframe")
|
||||
|
||||
// Align duration timers with the first keyframe.
|
||||
startRecording = pkt.CurrentTime
|
||||
|
||||
// Create a video file, and set the dimensions.
|
||||
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
|
||||
mp4Video.SetWidth(width)
|
||||
mp4Video.SetHeight(height)
|
||||
|
||||
if videoCodec == "H264" {
|
||||
videoTrack = mp4Video.AddVideoTrack("H264")
|
||||
} else if videoCodec == "H265" {
|
||||
videoTrack = mp4Video.AddVideoTrack("H265")
|
||||
}
|
||||
if audioCodec == "AAC" {
|
||||
audioTrack = mp4Video.AddAudioTrack("AAC")
|
||||
} else if audioCodec == "PCM_MULAW" {
|
||||
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
|
||||
}
|
||||
start = true
|
||||
}
|
||||
if start {
|
||||
pts := convertPTS(pkt.TimeLegacy)
|
||||
if pkt.IsVideo {
|
||||
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add video sample")
|
||||
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
|
||||
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
|
||||
if mp4Video != nil {
|
||||
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
|
||||
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
|
||||
}
|
||||
}
|
||||
} else if pkt.IsAudio {
|
||||
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add audio sample")
|
||||
if pkt.Codec == "AAC" {
|
||||
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
|
||||
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
|
||||
if mp4Video != nil {
|
||||
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
|
||||
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
|
||||
}
|
||||
}
|
||||
} else if pkt.Codec == "PCM_MULAW" {
|
||||
// TODO: transcode to AAC, some work to do..
|
||||
@@ -579,7 +603,25 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
|
||||
// This is used to determine if we need to start a new recording.
|
||||
lastRecordingTime = pkt.CurrentTime
|
||||
|
||||
if mp4Video == nil {
|
||||
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): recording closed without keyframe; no MP4 created")
|
||||
continue
|
||||
}
|
||||
|
||||
// This will close the recording and write the last packet.
|
||||
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
|
||||
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
|
||||
}
|
||||
if len(mp4Video.PPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.PPSNALUs) > 0 {
|
||||
mp4Video.PPSNALUs = configuration.Config.Capture.IPCamera.PPSNALUs
|
||||
}
|
||||
if len(mp4Video.VPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.VPSNALUs) > 0 {
|
||||
mp4Video.VPSNALUs = configuration.Config.Capture.IPCamera.VPSNALUs
|
||||
}
|
||||
if (videoCodec == "H264" && (len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) ||
|
||||
(videoCodec == "H265" && (len(mp4Video.VPSNALUs) == 0 || len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) {
|
||||
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): closing MP4 without full parameter sets, moov may be incomplete")
|
||||
}
|
||||
mp4Video.Close(&config)
|
||||
log.Log.Info("capture.main.HandleRecordStream(motiondetection): file save: " + name)
|
||||
|
||||
|
||||
@@ -20,96 +20,93 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
|
||||
|
||||
// This is legacy should be removed in future! Now everything
|
||||
// lives under the /api prefix.
|
||||
r.GET("/config", func(c *gin.Context) {
|
||||
r.GET("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
|
||||
components.GetConfig(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
// This is legacy should be removed in future! Now everything
|
||||
// lives under the /api prefix.
|
||||
r.POST("/config", func(c *gin.Context) {
|
||||
r.POST("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
|
||||
components.UpdateConfig(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api := r.Group("/api")
|
||||
{
|
||||
// Public endpoints (no authentication required)
|
||||
api.POST("/login", authMiddleware.LoginHandler)
|
||||
|
||||
api.GET("/dashboard", func(c *gin.Context) {
|
||||
components.GetDashboard(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.POST("/latest-events", func(c *gin.Context) {
|
||||
components.GetLatestEvents(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/days", func(c *gin.Context) {
|
||||
components.GetDays(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/config", func(c *gin.Context) {
|
||||
components.GetConfig(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
api.POST("/config", func(c *gin.Context) {
|
||||
components.UpdateConfig(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
// Will verify the hub settings.
|
||||
api.POST("/hub/verify", func(c *gin.Context) {
|
||||
cloud.VerifyHub(c)
|
||||
})
|
||||
|
||||
// Will verify the persistence settings.
|
||||
api.POST("/persistence/verify", func(c *gin.Context) {
|
||||
cloud.VerifyPersistence(c, configDirectory)
|
||||
})
|
||||
|
||||
// Will verify the secondary persistence settings.
|
||||
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
|
||||
cloud.VerifySecondaryPersistence(c, configDirectory)
|
||||
})
|
||||
|
||||
// Camera specific methods. Doesn't require any authorization.
|
||||
// These are available for anyone, but require the agent, to reach
|
||||
// the camera.
|
||||
|
||||
api.POST("/camera/restart", func(c *gin.Context) {
|
||||
components.RestartAgent(c, communication)
|
||||
})
|
||||
|
||||
api.POST("/camera/stop", func(c *gin.Context) {
|
||||
components.StopAgent(c, communication)
|
||||
})
|
||||
|
||||
api.POST("/camera/record", func(c *gin.Context) {
|
||||
components.MakeRecording(c, communication)
|
||||
})
|
||||
|
||||
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
|
||||
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
|
||||
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
// Onvif specific methods. Doesn't require any authorization.
|
||||
// Will verify the current onvif settings.
|
||||
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
|
||||
api.POST("/camera/onvif/login", LoginToOnvif)
|
||||
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
|
||||
api.POST("/camera/onvif/presets", GetOnvifPresets)
|
||||
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
|
||||
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
|
||||
api.POST("/camera/onvif/zoom", DoOnvifZoom)
|
||||
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
|
||||
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
|
||||
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
|
||||
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
|
||||
|
||||
// Secured endpoints..
|
||||
// Apply JWT authentication middleware.
|
||||
// All routes registered below this line require a valid JWT token.
|
||||
api.Use(authMiddleware.MiddlewareFunc())
|
||||
{
|
||||
api.GET("/dashboard", func(c *gin.Context) {
|
||||
components.GetDashboard(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.POST("/latest-events", func(c *gin.Context) {
|
||||
components.GetLatestEvents(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/days", func(c *gin.Context) {
|
||||
components.GetDays(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/config", func(c *gin.Context) {
|
||||
components.GetConfig(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
api.POST("/config", func(c *gin.Context) {
|
||||
components.UpdateConfig(c, configDirectory, configuration, communication)
|
||||
})
|
||||
|
||||
// Will verify the hub settings.
|
||||
api.POST("/hub/verify", func(c *gin.Context) {
|
||||
cloud.VerifyHub(c)
|
||||
})
|
||||
|
||||
// Will verify the persistence settings.
|
||||
api.POST("/persistence/verify", func(c *gin.Context) {
|
||||
cloud.VerifyPersistence(c, configDirectory)
|
||||
})
|
||||
|
||||
// Will verify the secondary persistence settings.
|
||||
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
|
||||
cloud.VerifySecondaryPersistence(c, configDirectory)
|
||||
})
|
||||
|
||||
// Camera specific methods.
|
||||
api.POST("/camera/restart", func(c *gin.Context) {
|
||||
components.RestartAgent(c, communication)
|
||||
})
|
||||
|
||||
api.POST("/camera/stop", func(c *gin.Context) {
|
||||
components.StopAgent(c, communication)
|
||||
})
|
||||
|
||||
api.POST("/camera/record", func(c *gin.Context) {
|
||||
components.MakeRecording(c, communication)
|
||||
})
|
||||
|
||||
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
|
||||
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
|
||||
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
|
||||
})
|
||||
|
||||
// Onvif specific methods.
|
||||
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
|
||||
api.POST("/camera/onvif/login", LoginToOnvif)
|
||||
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
|
||||
api.POST("/camera/onvif/presets", GetOnvifPresets)
|
||||
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
|
||||
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
|
||||
api.POST("/camera/onvif/zoom", DoOnvifZoom)
|
||||
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
|
||||
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
|
||||
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
|
||||
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
|
||||
}
|
||||
}
|
||||
return api
|
||||
|
||||
@@ -90,10 +90,31 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
|
||||
|
||||
// Some extra options to make sure the connection behaves
|
||||
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
|
||||
opts.SetCleanSession(true)
|
||||
opts.SetCleanSession(false)
|
||||
opts.SetResumeSubs(true)
|
||||
opts.SetStore(mqtt.NewMemoryStore())
|
||||
opts.SetConnectRetry(true)
|
||||
//opts.SetAutoReconnect(true)
|
||||
opts.SetAutoReconnect(true)
|
||||
opts.SetConnectRetryInterval(5 * time.Second)
|
||||
opts.SetMaxReconnectInterval(1 * time.Minute)
|
||||
opts.SetKeepAlive(30 * time.Second)
|
||||
opts.SetPingTimeout(10 * time.Second)
|
||||
opts.SetWriteTimeout(10 * time.Second)
|
||||
opts.SetOrderMatters(false)
|
||||
opts.SetConnectTimeout(30 * time.Second)
|
||||
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
||||
if err != nil {
|
||||
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost: " + err.Error())
|
||||
} else {
|
||||
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost")
|
||||
}
|
||||
})
|
||||
opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) {
|
||||
log.Log.Warning("routers.mqtt.main.ConfigureMQTT(): reconnecting to MQTT broker")
|
||||
})
|
||||
opts.SetOnConnectHandler(func(c mqtt.Client) {
|
||||
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): MQTT session is online")
|
||||
})
|
||||
|
||||
hubKey := ""
|
||||
// This is the old way ;)
|
||||
@@ -133,10 +154,14 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
|
||||
}
|
||||
}
|
||||
mqc := mqtt.NewClient(opts)
|
||||
if token := mqc.Connect(); token.WaitTimeout(3 * time.Second) {
|
||||
if token := mqc.Connect(); token.WaitTimeout(30 * time.Second) {
|
||||
if token.Error() != nil {
|
||||
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): unable to establish mqtt broker connection, error was: " + token.Error().Error())
|
||||
} else {
|
||||
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): initial MQTT connection established")
|
||||
}
|
||||
} else {
|
||||
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): timed out while establishing mqtt broker connection")
|
||||
}
|
||||
return mqc
|
||||
}
|
||||
@@ -149,7 +174,7 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
|
||||
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}")
|
||||
} else {
|
||||
agentListener := fmt.Sprintf("kerberos/agent/%s", hubKey)
|
||||
mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
|
||||
token := mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
|
||||
|
||||
// Decode the message, we are expecting following format.
|
||||
// {
|
||||
@@ -276,6 +301,16 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
|
||||
|
||||
}
|
||||
})
|
||||
|
||||
if token.WaitTimeout(10 * time.Second) {
|
||||
if token.Error() != nil {
|
||||
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): failed to subscribe to " + agentListener + ": " + token.Error().Error())
|
||||
} else {
|
||||
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): subscribed to " + agentListener)
|
||||
}
|
||||
} else {
|
||||
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): timed out while subscribing to " + agentListener)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Eyevinn/mp4ff/avc"
|
||||
mp4ff "github.com/Eyevinn/mp4ff/mp4"
|
||||
"github.com/kerberos-io/agent/machinery/src/encryption"
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
@@ -431,7 +432,12 @@ func (mp4 *MP4) Close(config *models.Config) {
|
||||
mp4.TotalKeyframesReceived, mp4.TotalKeyframesWritten, mp4.SegmentCount, mp4.FragmentKeyframeCount))
|
||||
|
||||
if mp4.VideoTotalDuration == 0 && mp4.AudioTotalDuration == 0 {
|
||||
log.Log.Error("mp4.Close(): no video or audio samples added, cannot create MP4 file")
|
||||
log.Log.Error("mp4.Close(): no video or audio samples added, removing empty MP4 file")
|
||||
mp4.Writer.Flush()
|
||||
_ = mp4.FileWriter.Sync()
|
||||
_ = mp4.FileWriter.Close()
|
||||
_ = os.Remove(mp4.FileName)
|
||||
return
|
||||
}
|
||||
|
||||
// Add final pending samples before closing
|
||||
@@ -554,9 +560,16 @@ func (mp4 *MP4) Close(config *models.Config) {
|
||||
case "H264", "AVC1":
|
||||
init.AddEmptyTrack(videoTimescale, "video", "und")
|
||||
includePS := true
|
||||
err := init.Moov.Traks[0].SetAVCDescriptor("avc1", mp4.SPSNALUs, mp4.PPSNALUs, includePS)
|
||||
spsNALUs, ppsNALUs := normalizeH264ParameterSets(mp4.SPSNALUs, mp4.PPSNALUs)
|
||||
log.Log.Debug("mp4.Close(): AVC parameter sets: SPS=" + formatNaluDebug(spsNALUs) + ", PPS=" + formatNaluDebug(ppsNALUs))
|
||||
err := init.Moov.Traks[0].SetAVCDescriptor("avc1", spsNALUs, ppsNALUs, includePS)
|
||||
if err != nil {
|
||||
log.Log.Error("mp4.Close(): error setting AVC descriptor: " + err.Error())
|
||||
if fallbackErr := addAVCDescriptorFallback(init.Moov.Traks[0], spsNALUs, ppsNALUs, uint16(mp4.width), uint16(mp4.height)); fallbackErr != nil {
|
||||
log.Log.Error("mp4.Close(): error setting AVC descriptor fallback: " + fallbackErr.Error())
|
||||
} else {
|
||||
log.Log.Warning("mp4.Close(): AVC descriptor fallback used due to SPS parse error")
|
||||
}
|
||||
}
|
||||
init.Moov.Traks[0].Tkhd.Duration = actualVideoDuration
|
||||
init.Moov.Traks[0].Tkhd.Width = mp4ff.Fixed32(uint32(mp4.width) << 16)
|
||||
@@ -573,7 +586,9 @@ func (mp4 *MP4) Close(config *models.Config) {
|
||||
case "H265", "HVC1":
|
||||
init.AddEmptyTrack(videoTimescale, "video", "und")
|
||||
includePS := true
|
||||
err := init.Moov.Traks[0].SetHEVCDescriptor("hvc1", mp4.VPSNALUs, mp4.SPSNALUs, mp4.PPSNALUs, [][]byte{}, includePS)
|
||||
vpsNALUs, spsNALUs, ppsNALUs := normalizeH265ParameterSets(mp4.VPSNALUs, mp4.SPSNALUs, mp4.PPSNALUs)
|
||||
log.Log.Debug("mp4.Close(): HEVC parameter sets: VPS=" + formatNaluDebug(vpsNALUs) + ", SPS=" + formatNaluDebug(spsNALUs) + ", PPS=" + formatNaluDebug(ppsNALUs))
|
||||
err := init.Moov.Traks[0].SetHEVCDescriptor("hvc1", vpsNALUs, spsNALUs, ppsNALUs, [][]byte{}, includePS)
|
||||
if err != nil {
|
||||
log.Log.Error("mp4.Close(): error setting HEVC descriptor: " + err.Error())
|
||||
}
|
||||
@@ -589,8 +604,8 @@ func (mp4 *MP4) Close(config *models.Config) {
|
||||
init.Moov.Traks[0].Mdia.Mdhd.ModificationTime = macTime
|
||||
}
|
||||
|
||||
// Try adding audio track if available
|
||||
if mp4.AudioTrackName == "AAC" || mp4.AudioTrackName == "MP4A" {
|
||||
// Try adding audio track if available and samples were recorded.
|
||||
if (mp4.AudioTrackName == "AAC" || mp4.AudioTrackName == "MP4A") && mp4.AudioTotalDuration > 0 {
|
||||
// Add an audio track to the moov box
|
||||
init.AddEmptyTrack(audioTimescale, "audio", "und")
|
||||
|
||||
@@ -828,6 +843,172 @@ func removeAnnexBStartCode(nalu []byte) []byte {
|
||||
return nalu
|
||||
}
|
||||
|
||||
// sanitizeParameterSets removes Annex B start codes and drops empty NALUs.
|
||||
func sanitizeParameterSets(nalus [][]byte) [][]byte {
|
||||
if len(nalus) == 0 {
|
||||
return nalus
|
||||
}
|
||||
clean := make([][]byte, 0, len(nalus))
|
||||
for _, nalu := range nalus {
|
||||
trimmed := removeAnnexBStartCode(nalu)
|
||||
if len(trimmed) == 0 {
|
||||
continue
|
||||
}
|
||||
clean = append(clean, trimmed)
|
||||
}
|
||||
return clean
|
||||
}
|
||||
|
||||
// normalizeH264ParameterSets splits Annex B blobs and extracts SPS/PPS NALUs.
|
||||
func normalizeH264ParameterSets(spsIn [][]byte, ppsIn [][]byte) ([][]byte, [][]byte) {
|
||||
all := make([][]byte, 0, len(spsIn)+len(ppsIn))
|
||||
all = append(all, spsIn...)
|
||||
all = append(all, ppsIn...)
|
||||
var spsOut [][]byte
|
||||
var ppsOut [][]byte
|
||||
for _, blob := range all {
|
||||
for _, nalu := range splitParamSetNALUs(blob) {
|
||||
nalu = removeAnnexBStartCode(nalu)
|
||||
if len(nalu) == 0 {
|
||||
continue
|
||||
}
|
||||
typ := nalu[0] & 0x1F
|
||||
switch typ {
|
||||
case 7:
|
||||
spsOut = append(spsOut, nalu)
|
||||
case 8:
|
||||
ppsOut = append(ppsOut, nalu)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(spsOut) == 0 {
|
||||
spsOut = sanitizeParameterSets(spsIn)
|
||||
}
|
||||
if len(ppsOut) == 0 {
|
||||
ppsOut = sanitizeParameterSets(ppsIn)
|
||||
}
|
||||
return spsOut, ppsOut
|
||||
}
|
||||
|
||||
// normalizeH265ParameterSets splits Annex B blobs and extracts VPS/SPS/PPS NALUs.
|
||||
func normalizeH265ParameterSets(vpsIn [][]byte, spsIn [][]byte, ppsIn [][]byte) ([][]byte, [][]byte, [][]byte) {
|
||||
all := make([][]byte, 0, len(vpsIn)+len(spsIn)+len(ppsIn))
|
||||
all = append(all, vpsIn...)
|
||||
all = append(all, spsIn...)
|
||||
all = append(all, ppsIn...)
|
||||
var vpsOut [][]byte
|
||||
var spsOut [][]byte
|
||||
var ppsOut [][]byte
|
||||
for _, blob := range all {
|
||||
for _, nalu := range splitParamSetNALUs(blob) {
|
||||
nalu = removeAnnexBStartCode(nalu)
|
||||
if len(nalu) == 0 {
|
||||
continue
|
||||
}
|
||||
typ := (nalu[0] >> 1) & 0x3F
|
||||
switch typ {
|
||||
case 32:
|
||||
vpsOut = append(vpsOut, nalu)
|
||||
case 33:
|
||||
spsOut = append(spsOut, nalu)
|
||||
case 34:
|
||||
ppsOut = append(ppsOut, nalu)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(vpsOut) == 0 {
|
||||
vpsOut = sanitizeParameterSets(vpsIn)
|
||||
}
|
||||
if len(spsOut) == 0 {
|
||||
spsOut = sanitizeParameterSets(spsIn)
|
||||
}
|
||||
if len(ppsOut) == 0 {
|
||||
ppsOut = sanitizeParameterSets(ppsIn)
|
||||
}
|
||||
return vpsOut, spsOut, ppsOut
|
||||
}
|
||||
|
||||
// splitParamSetNALUs splits Annex B parameter set blobs; raw NALUs are returned as-is.
|
||||
func splitParamSetNALUs(blob []byte) [][]byte {
|
||||
if len(blob) == 0 {
|
||||
return nil
|
||||
}
|
||||
if findStartCode(blob, 0) >= 0 {
|
||||
return splitNALUs(blob)
|
||||
}
|
||||
return [][]byte{blob}
|
||||
}
|
||||
|
||||
func formatNaluDebug(nalus [][]byte) string {
|
||||
if len(nalus) == 0 {
|
||||
return "none"
|
||||
}
|
||||
parts := make([]string, 0, len(nalus))
|
||||
for _, nalu := range nalus {
|
||||
if len(nalu) == 0 {
|
||||
parts = append(parts, "len=0")
|
||||
continue
|
||||
}
|
||||
max := 8
|
||||
if len(nalu) < max {
|
||||
max = len(nalu)
|
||||
}
|
||||
parts = append(parts, fmt.Sprintf("len=%d head=%x", len(nalu), nalu[:max]))
|
||||
}
|
||||
return strings.Join(parts, "; ")
|
||||
}
|
||||
|
||||
func addAVCDescriptorFallback(trak *mp4ff.TrakBox, spsNALUs, ppsNALUs [][]byte, width, height uint16) error {
|
||||
if trak == nil || trak.Mdia == nil || trak.Mdia.Minf == nil || trak.Mdia.Minf.Stbl == nil || trak.Mdia.Minf.Stbl.Stsd == nil {
|
||||
return fmt.Errorf("missing trak stsd")
|
||||
}
|
||||
if len(spsNALUs) == 0 {
|
||||
return fmt.Errorf("no SPS NALU available")
|
||||
}
|
||||
decConfRec, err := buildAVCDecConfRecFromSPS(spsNALUs, ppsNALUs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if width == 0 && trak.Tkhd != nil {
|
||||
width = uint16(uint32(trak.Tkhd.Width) >> 16)
|
||||
}
|
||||
if height == 0 && trak.Tkhd != nil {
|
||||
height = uint16(uint32(trak.Tkhd.Height) >> 16)
|
||||
}
|
||||
if width > 0 && height > 0 && trak.Tkhd != nil {
|
||||
trak.Tkhd.Width = mp4ff.Fixed32(uint32(width) << 16)
|
||||
trak.Tkhd.Height = mp4ff.Fixed32(uint32(height) << 16)
|
||||
}
|
||||
avcC := &mp4ff.AvcCBox{DecConfRec: *decConfRec}
|
||||
avcx := mp4ff.CreateVisualSampleEntryBox("avc1", width, height, avcC)
|
||||
trak.Mdia.Minf.Stbl.Stsd.AddChild(avcx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildAVCDecConfRecFromSPS(spsNALUs, ppsNALUs [][]byte) (*avc.DecConfRec, error) {
|
||||
if len(spsNALUs) == 0 {
|
||||
return nil, fmt.Errorf("no SPS NALU available")
|
||||
}
|
||||
sps := spsNALUs[0]
|
||||
if len(sps) < 4 {
|
||||
return nil, fmt.Errorf("SPS too short: len=%d", len(sps))
|
||||
}
|
||||
// SPS NALU: byte 0 is NAL header, next 3 bytes are profile/compat/level.
|
||||
dec := &avc.DecConfRec{
|
||||
AVCProfileIndication: sps[1],
|
||||
ProfileCompatibility: sps[2],
|
||||
AVCLevelIndication: sps[3],
|
||||
SPSnalus: spsNALUs,
|
||||
PPSnalus: ppsNALUs,
|
||||
ChromaFormat: 1,
|
||||
BitDepthLumaMinus1: 0,
|
||||
BitDepthChromaMinus1: 0,
|
||||
NumSPSExt: 0,
|
||||
NoTrailingInfo: true,
|
||||
}
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
// splitNALUs splits Annex B data into raw NAL units without start codes.
|
||||
func splitNALUs(data []byte) [][]byte {
|
||||
var nalus [][]byte
|
||||
|
||||
@@ -31,6 +31,7 @@ const (
|
||||
// Timeouts and intervals
|
||||
keepAliveTimeout = 15 * time.Second
|
||||
defaultTimeout = 10 * time.Second
|
||||
maxLivePacketAge = 1500 * time.Millisecond
|
||||
|
||||
// Track identifiers
|
||||
trackStreamID = "kerberos-stream"
|
||||
@@ -50,6 +51,7 @@ type peerConnectionWrapper struct {
|
||||
cancelCtx context.CancelFunc
|
||||
done chan struct{}
|
||||
closeOnce sync.Once
|
||||
connected atomic.Bool
|
||||
}
|
||||
|
||||
var globalConnectionManager = NewConnectionManager()
|
||||
@@ -88,22 +90,41 @@ func (cm *ConnectionManager) CloseCandidateChannel(sessionKey string) {
|
||||
}
|
||||
|
||||
// AddPeerConnection adds a peer connection to the manager
|
||||
func (cm *ConnectionManager) AddPeerConnection(sessionID string, wrapper *peerConnectionWrapper) {
|
||||
func (cm *ConnectionManager) AddPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
cm.peerConnections[sessionID] = wrapper
|
||||
cm.peerConnections[sessionKey] = wrapper
|
||||
}
|
||||
|
||||
// RemovePeerConnection removes a peer connection from the manager
|
||||
func (cm *ConnectionManager) RemovePeerConnection(sessionID string) {
|
||||
func (cm *ConnectionManager) RemovePeerConnection(sessionKey string) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
if wrapper, exists := cm.peerConnections[sessionID]; exists {
|
||||
if wrapper, exists := cm.peerConnections[sessionKey]; exists {
|
||||
if wrapper.cancelCtx != nil {
|
||||
wrapper.cancelCtx()
|
||||
}
|
||||
delete(cm.peerConnections, sessionID)
|
||||
delete(cm.peerConnections, sessionKey)
|
||||
}
|
||||
}
|
||||
|
||||
// QueueCandidate safely queues a candidate for a session without racing with channel closure.
|
||||
func (cm *ConnectionManager) QueueCandidate(sessionKey string, candidate string) bool {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
ch, exists := cm.candidateChannels[sessionKey]
|
||||
if !exists {
|
||||
ch = make(chan string, candidateChannelBuffer)
|
||||
cm.candidateChannels[sessionKey] = ch
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- candidate:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,6 +143,26 @@ func (cm *ConnectionManager) DecrementPeerCount() int64 {
|
||||
return atomic.AddInt64(&cm.peerConnectionCount, -1)
|
||||
}
|
||||
|
||||
func cleanupPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
|
||||
wrapper.closeOnce.Do(func() {
|
||||
if wrapper.connected.Swap(false) {
|
||||
count := globalConnectionManager.DecrementPeerCount()
|
||||
log.Log.Info("webrtc.main.cleanupPeerConnection(): Peer disconnected. Active peers: " + strconv.FormatInt(count, 10))
|
||||
}
|
||||
|
||||
globalConnectionManager.CloseCandidateChannel(sessionKey)
|
||||
|
||||
if wrapper.conn != nil {
|
||||
if err := wrapper.conn.Close(); err != nil {
|
||||
log.Log.Error("webrtc.main.cleanupPeerConnection(): error closing peer connection: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
globalConnectionManager.RemovePeerConnection(sessionKey)
|
||||
close(wrapper.done)
|
||||
})
|
||||
}
|
||||
|
||||
type WebRTC struct {
|
||||
Name string
|
||||
StunServers []string
|
||||
@@ -161,16 +202,27 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
|
||||
}
|
||||
|
||||
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
|
||||
ch := globalConnectionManager.GetOrCreateCandidateChannel(key)
|
||||
|
||||
log.Log.Info("webrtc.main.RegisterCandidates(): " + candidate.Candidate)
|
||||
select {
|
||||
case ch <- candidate.Candidate:
|
||||
default:
|
||||
if !globalConnectionManager.QueueCandidate(key, candidate.Candidate) {
|
||||
log.Log.Info("webrtc.main.RegisterCandidates(): channel is full, dropping candidate")
|
||||
}
|
||||
}
|
||||
|
||||
func decodeICECandidate(candidate string) (pionWebRTC.ICECandidateInit, error) {
|
||||
if candidate == "" {
|
||||
return pionWebRTC.ICECandidateInit{}, io.EOF
|
||||
}
|
||||
|
||||
var candidateInit pionWebRTC.ICECandidateInit
|
||||
if err := json.Unmarshal([]byte(candidate), &candidateInit); err == nil {
|
||||
if candidateInit.Candidate != "" {
|
||||
return candidateInit, nil
|
||||
}
|
||||
}
|
||||
|
||||
return pionWebRTC.ICECandidateInit{Candidate: candidate}, nil
|
||||
}
|
||||
|
||||
func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, interceptorRegistry *interceptor.Registry) error {
|
||||
if err := pionWebRTC.ConfigureNack(mediaEngine, interceptorRegistry); err != nil {
|
||||
return err
|
||||
@@ -273,7 +325,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
if videoTrack != nil {
|
||||
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
|
||||
cancel()
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
@@ -306,7 +358,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
if audioTrack != nil {
|
||||
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
|
||||
cancel()
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
@@ -339,28 +391,14 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
|
||||
|
||||
switch connectionState {
|
||||
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed:
|
||||
wrapper.closeOnce.Do(func() {
|
||||
count := globalConnectionManager.DecrementPeerCount()
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer disconnected. Active peers: " + string(rune(count)))
|
||||
|
||||
// Clean up resources
|
||||
globalConnectionManager.CloseCandidateChannel(sessionKey)
|
||||
|
||||
if err := peerConnection.Close(); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error closing peer connection: " + err.Error())
|
||||
}
|
||||
|
||||
globalConnectionManager.RemovePeerConnection(handshake.SessionID)
|
||||
close(wrapper.done)
|
||||
})
|
||||
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed, pionWebRTC.PeerConnectionStateFailed:
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
|
||||
case pionWebRTC.PeerConnectionStateConnected:
|
||||
count := globalConnectionManager.IncrementPeerCount()
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + string(rune(count)))
|
||||
|
||||
case pionWebRTC.PeerConnectionStateFailed:
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection failed")
|
||||
if wrapper.connected.CompareAndSwap(false, true) {
|
||||
count := globalConnectionManager.IncrementPeerCount()
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + strconv.FormatInt(count, 10))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
@@ -379,27 +417,21 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
return
|
||||
}
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
|
||||
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: candidate}); candidateErr != nil {
|
||||
candidateInit, decodeErr := decodeICECandidate(candidate)
|
||||
if decodeErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
|
||||
continue
|
||||
}
|
||||
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
offer := w.CreateOffer(sd)
|
||||
if err = peerConnection.SetRemoteDescription(offer); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
|
||||
}
|
||||
|
||||
answer, err := peerConnection.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
|
||||
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
|
||||
}
|
||||
|
||||
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
|
||||
// The other peer will add this candidate by calling AddICECandidate
|
||||
// The other peer will add this candidate by calling AddICECandidate.
|
||||
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
|
||||
var hasRelayCandidates bool
|
||||
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
|
||||
|
||||
@@ -444,8 +476,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
candateBinary, err := json.Marshal(candidateJSON)
|
||||
if err == nil {
|
||||
valueMap["candidate"] = string(candateBinary)
|
||||
// SDP is not needed to be send..
|
||||
//valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
|
||||
valueMap["session_id"] = handshake.SessionID
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
|
||||
} else {
|
||||
@@ -469,35 +499,52 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
}
|
||||
})
|
||||
|
||||
offer := w.CreateOffer(sd)
|
||||
if err = peerConnection.SetRemoteDescription(offer); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
|
||||
answer, err := peerConnection.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
|
||||
// Store peer connection in manager
|
||||
globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper)
|
||||
globalConnectionManager.AddPeerConnection(sessionKey, wrapper)
|
||||
|
||||
// Create a config map
|
||||
valueMap := make(map[string]interface{})
|
||||
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
|
||||
valueMap["session_id"] = handshake.SessionID
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
|
||||
|
||||
// We'll send the candidate to the hub
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Action: "receive-hd-answer",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: valueMap,
|
||||
},
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
// Create a config map
|
||||
valueMap := make(map[string]interface{})
|
||||
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
|
||||
valueMap["session_id"] = handshake.SessionID
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
|
||||
|
||||
// We'll send the candidate to the hub
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Action: "receive-hd-answer",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: valueMap,
|
||||
},
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
|
||||
token.Wait()
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
|
||||
}
|
||||
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
|
||||
token.Wait()
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Log.Error("Initializwebrtc.main.InitializeWebRTCConnection()eWebRTCConnection: NewPeerConnection failed: " + err.Error())
|
||||
globalConnectionManager.CloseCandidateChannel(sessionKey)
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to decode remote session description: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -539,6 +586,7 @@ type streamState struct {
|
||||
lastKeepAlive int64
|
||||
peerCount int64
|
||||
start bool
|
||||
catchingUp bool
|
||||
receivedKeyFrame bool
|
||||
lastAudioSample *pionMedia.Sample
|
||||
lastVideoSample *pionMedia.Sample
|
||||
@@ -627,6 +675,41 @@ func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.Tr
|
||||
}
|
||||
}
|
||||
|
||||
func sampleTimestamp(pkt packets.Packet) uint32 {
|
||||
if pkt.TimeLegacy > 0 {
|
||||
return uint32(pkt.TimeLegacy.Milliseconds())
|
||||
}
|
||||
|
||||
if pkt.Time > 0 {
|
||||
return uint32(pkt.Time)
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func sampleDuration(current packets.Packet, previousTimestamp uint32, fallback time.Duration) time.Duration {
|
||||
if current.TimeLegacy > 0 {
|
||||
currentDurationMs := current.TimeLegacy.Milliseconds()
|
||||
previousDurationMs := int64(previousTimestamp)
|
||||
if currentDurationMs > previousDurationMs {
|
||||
duration := time.Duration(currentDurationMs-previousDurationMs) * time.Millisecond
|
||||
if duration > 0 {
|
||||
return duration
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
currentTimestamp := sampleTimestamp(current)
|
||||
if currentTimestamp > previousTimestamp {
|
||||
duration := time.Duration(currentTimestamp-previousTimestamp) * time.Millisecond
|
||||
if duration > 0 {
|
||||
return duration
|
||||
}
|
||||
}
|
||||
|
||||
return fallback
|
||||
}
|
||||
|
||||
// processVideoPacket processes a video packet and writes samples to the track
|
||||
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
|
||||
if videoTrack == nil {
|
||||
@@ -642,7 +725,7 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
|
||||
return
|
||||
}
|
||||
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
|
||||
|
||||
if config.Capture.ForwardWebRTC == "true" {
|
||||
// Remote forwarding not yet implemented
|
||||
@@ -651,8 +734,7 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
|
||||
}
|
||||
|
||||
if state.lastVideoSample != nil {
|
||||
duration := sample.PacketTimestamp - state.lastVideoSample.PacketTimestamp
|
||||
state.lastVideoSample.Duration = time.Duration(duration) * time.Millisecond
|
||||
state.lastVideoSample.Duration = sampleDuration(pkt, state.lastVideoSample.PacketTimestamp, 33*time.Millisecond)
|
||||
|
||||
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
|
||||
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
|
||||
@@ -674,11 +756,10 @@ func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pion
|
||||
return
|
||||
}
|
||||
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
|
||||
|
||||
if state.lastAudioSample != nil {
|
||||
duration := sample.PacketTimestamp - state.lastAudioSample.PacketTimestamp
|
||||
state.lastAudioSample.Duration = time.Duration(duration) * time.Millisecond
|
||||
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
|
||||
|
||||
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
|
||||
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
|
||||
@@ -688,6 +769,15 @@ func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pion
|
||||
state.lastAudioSample = &sample
|
||||
}
|
||||
|
||||
func shouldDropPacketForLatency(pkt packets.Packet) bool {
|
||||
if pkt.CurrentTime == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
age := time.Since(time.UnixMilli(pkt.CurrentTime))
|
||||
return age > maxLivePacketAge
|
||||
}
|
||||
|
||||
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
|
||||
|
||||
config := configuration.Config
|
||||
@@ -747,6 +837,31 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
|
||||
continue
|
||||
}
|
||||
|
||||
// Keep live WebRTC close to realtime.
|
||||
// If audio+video load makes this consumer fall behind, skip old packets and
|
||||
// wait for a recent keyframe before resuming video.
|
||||
if shouldDropPacketForLatency(pkt) {
|
||||
if !state.catchingUp {
|
||||
log.Log.Warning("webrtc.main.WriteToTrack(): stream is lagging behind, dropping old packets until the next recent keyframe")
|
||||
}
|
||||
state.catchingUp = true
|
||||
state.start = false
|
||||
state.receivedKeyFrame = false
|
||||
state.lastAudioSample = nil
|
||||
state.lastVideoSample = nil
|
||||
continue
|
||||
}
|
||||
|
||||
if state.catchingUp {
|
||||
if !(pkt.IsVideo && pkt.IsKeyFrame) {
|
||||
continue
|
||||
}
|
||||
state.catchingUp = false
|
||||
state.start = false
|
||||
state.receivedKeyFrame = false
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): caught up with live stream at a recent keyframe")
|
||||
}
|
||||
|
||||
// Wait for first keyframe before processing
|
||||
if !state.receivedKeyFrame {
|
||||
if pkt.IsKeyFrame {
|
||||
|
||||
@@ -7,6 +7,7 @@ import './ImageCanvas.css';
|
||||
|
||||
class ImageCanvas extends React.Component {
|
||||
componentDidMount() {
|
||||
this.isUnmounted = false;
|
||||
this.width = 0;
|
||||
this.height = 0;
|
||||
|
||||
@@ -58,6 +59,9 @@ class ImageCanvas extends React.Component {
|
||||
|
||||
const { image } = this.props;
|
||||
this.loadImage(image, (img) => {
|
||||
if (this.isUnmounted || !this.editor) {
|
||||
return;
|
||||
}
|
||||
if (this.width !== img.width || this.height !== img.height) {
|
||||
this.width = img.width;
|
||||
this.height = img.height;
|
||||
@@ -71,6 +75,9 @@ class ImageCanvas extends React.Component {
|
||||
componentDidUpdate() {
|
||||
const { image } = this.props;
|
||||
this.loadImage(image, (img) => {
|
||||
if (this.isUnmounted || !this.editor) {
|
||||
return;
|
||||
}
|
||||
if (this.width !== img.width || this.height !== img.height) {
|
||||
this.width = img.width;
|
||||
this.height = img.height;
|
||||
@@ -82,11 +89,57 @@ class ImageCanvas extends React.Component {
|
||||
});
|
||||
}
|
||||
|
||||
componentWillUnmount() {
|
||||
this.isUnmounted = true;
|
||||
|
||||
if (this.pendingImage) {
|
||||
this.pendingImage.onload = null;
|
||||
this.pendingImage.src = '';
|
||||
this.pendingImage = null;
|
||||
}
|
||||
|
||||
if (this.editor) {
|
||||
this.editor.onSelectionEnd = null;
|
||||
this.editor.onRegionMoveEnd = null;
|
||||
this.editor.onRegionDelete = null;
|
||||
|
||||
if (this.editor.RM) {
|
||||
this.editor.RM.deleteAllRegions();
|
||||
}
|
||||
|
||||
if (typeof this.editor.dispose === 'function') {
|
||||
this.editor.dispose();
|
||||
} else if (typeof this.editor.destroy === 'function') {
|
||||
this.editor.destroy();
|
||||
}
|
||||
|
||||
this.editor = null;
|
||||
}
|
||||
|
||||
if (this.toolbarContainer) {
|
||||
this.toolbarContainer.innerHTML = '';
|
||||
this.toolbarContainer = null;
|
||||
}
|
||||
|
||||
if (this.editorContainer) {
|
||||
this.editorContainer.innerHTML = '';
|
||||
this.editorContainer = null;
|
||||
}
|
||||
}
|
||||
|
||||
loadData = (image) => {
|
||||
if (!this.editor) {
|
||||
return;
|
||||
}
|
||||
|
||||
const w = image.width;
|
||||
const h = image.height;
|
||||
|
||||
this.editor.addContentSource(image).then(() => {
|
||||
if (this.isUnmounted || !this.editor) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Add exisiting polygons
|
||||
this.editor.RM.deleteAllRegions();
|
||||
const { polygons } = this.props;
|
||||
@@ -152,11 +205,19 @@ class ImageCanvas extends React.Component {
|
||||
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
loadImage = (path, onready) => {
|
||||
if (this.pendingImage) {
|
||||
this.pendingImage.onload = null;
|
||||
}
|
||||
|
||||
const image = new Image();
|
||||
image.src = path;
|
||||
image.addEventListener('load', (e) => {
|
||||
this.pendingImage = image;
|
||||
image.onload = (e) => {
|
||||
if (this.pendingImage === image) {
|
||||
this.pendingImage = null;
|
||||
}
|
||||
onready(e.target);
|
||||
});
|
||||
};
|
||||
image.src = path;
|
||||
};
|
||||
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
|
||||
@@ -38,16 +38,14 @@ class Dashboard extends React.Component {
|
||||
initialised: false,
|
||||
};
|
||||
this.initialiseLiveview = this.initialiseLiveview.bind(this);
|
||||
this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this);
|
||||
}
|
||||
|
||||
componentDidMount() {
|
||||
const liveview = document.getElementsByClassName('videocard-video');
|
||||
if (liveview && liveview.length > 0) {
|
||||
liveview[0].addEventListener('load', () => {
|
||||
this.setState({
|
||||
liveviewLoaded: true,
|
||||
});
|
||||
});
|
||||
[this.liveviewElement] = liveview;
|
||||
this.liveviewElement.addEventListener('load', this.handleLiveviewLoad);
|
||||
}
|
||||
this.initialiseLiveview();
|
||||
}
|
||||
@@ -57,13 +55,14 @@ class Dashboard extends React.Component {
|
||||
}
|
||||
|
||||
componentWillUnmount() {
|
||||
const liveview = document.getElementsByClassName('videocard-video');
|
||||
if (liveview && liveview.length > 0) {
|
||||
liveview[0].remove();
|
||||
if (this.liveviewElement) {
|
||||
this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad);
|
||||
this.liveviewElement = null;
|
||||
}
|
||||
|
||||
if (this.requestStreamSubscription) {
|
||||
this.requestStreamSubscription.unsubscribe();
|
||||
this.requestStreamSubscription = null;
|
||||
}
|
||||
const { dispatchSend } = this.props;
|
||||
const message = {
|
||||
@@ -72,6 +71,12 @@ class Dashboard extends React.Component {
|
||||
dispatchSend(message);
|
||||
}
|
||||
|
||||
handleLiveviewLoad() {
|
||||
this.setState({
|
||||
liveviewLoaded: true,
|
||||
});
|
||||
}
|
||||
|
||||
handleClose() {
|
||||
this.setState({
|
||||
open: false,
|
||||
|
||||
@@ -159,7 +159,10 @@ class Settings extends React.Component {
|
||||
|
||||
componentWillUnmount() {
|
||||
document.removeEventListener('keydown', this.escFunction, false);
|
||||
clearInterval(this.interval);
|
||||
if (this.requestStreamSubscription) {
|
||||
this.requestStreamSubscription.unsubscribe();
|
||||
this.requestStreamSubscription = null;
|
||||
}
|
||||
|
||||
const { dispatchSend } = this.props;
|
||||
const message = {
|
||||
|
||||
Reference in New Issue
Block a user