Compare commits

...

3 Commits

Author SHA1 Message Date
Cedric Verstraeten
54bc1989f9 fix: update locking webrtc 2023-11-23 21:17:39 +01:00
Cedric Verstraeten
94b71a0868 fix: enabling backchannel on the mainstream 2023-11-20 09:57:55 +01:00
Cedric Verstraeten
c071057eec hotfix: do fallback without backchannel if camera didnt support it, some cameras such as Dahua will fail on the header. 2023-11-20 09:35:41 +01:00
9 changed files with 76 additions and 61 deletions

View File

@@ -2,7 +2,7 @@ module github.com/kerberos-io/agent/machinery
go 1.19
//replace github.com/kerberos-io/joy4 v1.0.60 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/joy4 v1.0.63 => ../../../../github.com/kerberos-io/joy4
// replace github.com/kerberos-io/onvif v0.0.6 => ../../../../github.com/kerberos-io/onvif
@@ -26,7 +26,7 @@ require (
github.com/golang-module/carbon/v2 v2.2.3
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.63
github.com/kerberos-io/joy4 v1.0.64
github.com/kerberos-io/onvif v0.0.7
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e

View File

@@ -264,8 +264,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.63 h1:1VucVzE+WojpWWVKxpVpjmnArnZ2XIQSqn9Sldm6PrE=
github.com/kerberos-io/joy4 v1.0.63/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/joy4 v1.0.64 h1:gTUSotHSOhp9mNqEecgq88tQHvpj7TjmrvPUsPm0idg=
github.com/kerberos-io/joy4 v1.0.64/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.7 h1:LIrXjTH7G2W9DN69xZeJSB0uS3W1+C3huFO8kTqx7/A=
github.com/kerberos-io/onvif v0.0.7/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=

View File

@@ -16,12 +16,27 @@ import (
"github.com/kerberos-io/joy4/format"
)
func OpenRTSP(ctx context.Context, url string) (av.DemuxCloser, []av.CodecData, error) {
func OpenRTSP(ctx context.Context, url string, withBackChannel bool) (av.DemuxCloser, []av.CodecData, error) {
format.RegisterAll()
infile, err := avutil.Open(ctx, url)
// Try with backchannel first (if variable is set to true)
// If set to true, it will try to open the stream with a backchannel
// If fails we will try again (see below).
infile, err := avutil.Open(ctx, url, withBackChannel)
if err == nil {
streams, errstreams := infile.Streams()
return infile, streams, errstreams
if len(streams) > 0 {
return infile, streams, errstreams
} else {
// Try again without backchannel
log.Log.Info("OpenRTSP: trying without backchannel")
withBackChannel = false
infile, err := avutil.Open(ctx, url, withBackChannel)
if err == nil {
streams, errstreams := infile.Streams()
return infile, streams, errstreams
}
}
}
return nil, []av.CodecData{}, err
}

View File

@@ -469,7 +469,7 @@ func VerifyCamera(c *gin.Context) {
if streamType == "secondary" {
rtspUrl = cameraStreams.SubRTSP
}
_, codecs, err := OpenRTSP(ctx, rtspUrl)
_, codecs, err := OpenRTSP(ctx, rtspUrl, true)
if err == nil {
videoIdx := -1

View File

@@ -601,14 +601,7 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
key := config.Key + "/" + handshake.SessionID
webrtc.CandidatesMutex.Lock()
_, ok := webrtc.CandidateArrays[key]
if !ok {
webrtc.CandidateArrays[key] = make(chan string)
}
webrtc.CandidatesMutex.Unlock()
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake, webrtc.CandidateArrays[key])
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
}
}

View File

@@ -118,9 +118,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
status := "not started"
// Currently only support H264 encoded cameras, this will change.
// Establishing the camera connection
// Establishing the camera connection without backchannel if no substream
rtspUrl := config.Capture.IPCamera.RTSP
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl)
withBackChannel := true
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl, withBackChannel)
// We will initialise the camera settings object
// so we can check if the camera settings have changed, and we need
@@ -157,7 +158,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
subStreamEnabled := false
subRtspUrl := config.Capture.IPCamera.SubRTSP
if subRtspUrl != "" && subRtspUrl != rtspUrl {
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl)
withBackChannel := false
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl, withBackChannel) // We'll try to enable backchannel for the substream.
if err == nil {
log.Log.Info("RunAgent: opened RTSP sub stream " + subRtspUrl)
subStreamEnabled = true

View File

@@ -751,24 +751,6 @@ func GoToPresetFromDevice(device *onvif.Device, presetName string) error {
return err
}
func getXMLNode(xmlBody string, nodeName string) (*xml.Decoder, *xml.StartElement, error) {
xmlBytes := bytes.NewBufferString(xmlBody)
decodedXML := xml.NewDecoder(xmlBytes)
for {
token, err := decodedXML.Token()
if err != nil {
break
}
switch et := token.(type) {
case xml.StartElement:
if et.Name.Local == nodeName {
return decodedXML, &et, nil
}
}
}
return nil, nil, errors.New("error in NodeName - username and password might be wrong")
}
func GetPTZFunctionsFromDevice(configurations ptz.GetConfigurationsResponse) ([]string, bool, bool) {
var functions []string
canZoom := false
@@ -854,3 +836,21 @@ func VerifyOnvifConnection(c *gin.Context) {
})
}
}
func getXMLNode(xmlBody string, nodeName string) (*xml.Decoder, *xml.StartElement, error) {
xmlBytes := bytes.NewBufferString(xmlBody)
decodedXML := xml.NewDecoder(xmlBytes)
for {
token, err := decodedXML.Token()
if err != nil {
break
}
switch et := token.(type) {
case xml.StartElement:
if et.Name.Local == nodeName {
return decodedXML, &et, nil
}
}
}
return nil, nil, errors.New("error in NodeName - username and password might be wrong")
}

View File

@@ -478,7 +478,7 @@ func HandleReceiveHDCandidates(mqttClient mqtt.Client, hubKey string, payload mo
if communication.CameraConnected {
// Register candidate channel
key := configuration.Config.Key + "/" + receiveHDCandidatesPayload.SessionID
webrtc.RegisterCandidates(key, receiveHDCandidatesPayload)
go webrtc.RegisterCandidates(key, receiveHDCandidatesPayload)
} else {
log.Log.Info("HandleReceiveHDCandidates: received candidate, but camera is not connected.")
}

View File

@@ -88,11 +88,8 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
}
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
// Set lock
CandidatesMutex.Lock()
defer CandidatesMutex.Unlock()
channel := CandidateArrays[key]
if channel == nil {
channel = make(chan string)
@@ -100,9 +97,10 @@ func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload)
}
log.Log.Info("HandleReceiveHDCandidates: " + candidate.Candidate)
channel <- candidate.Candidate
CandidatesMutex.Unlock()
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload, candidates chan string) {
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {
config := configuration.Config
deviceKey := config.Key
@@ -111,6 +109,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
turnServersUsername := config.TURNUsername
turnServersCredential := config.TURNPassword
// We create a channel which will hold the candidates for this session.
sessionKey := config.Key + "/" + handshake.SessionID
CandidatesMutex.Lock()
_, ok := CandidateArrays[sessionKey]
if !ok {
CandidateArrays[sessionKey] = make(chan string)
}
CandidatesMutex.Unlock()
// Set variables
hubKey := handshake.HubKey
sessionDescription := handshake.SessionDescription
@@ -147,35 +154,40 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
if err == nil && peerConnection != nil {
if _, err = peerConnection.AddTrack(videoTrack); err != nil {
panic(err)
//panic(err)
}
if _, err = peerConnection.AddTrack(audioTrack); err != nil {
panic(err)
//panic(err)
}
if err != nil {
panic(err)
//panic(err)
}
peerConnection.OnICEConnectionStateChange(func(connectionState pionWebRTC.ICEConnectionState) {
if connectionState == pionWebRTC.ICEConnectionStateDisconnected {
CandidatesMutex.Lock()
defer CandidatesMutex.Unlock()
atomic.AddInt64(&peerConnectionCount, -1)
// Set lock
CandidatesMutex.Lock()
peerConnections[handshake.SessionID] = nil
close(candidates)
_, ok := CandidateArrays[sessionKey]
if ok {
close(CandidateArrays[sessionKey])
}
CandidatesMutex.Unlock()
close(w.PacketsCount)
if err := peerConnection.Close(); err != nil {
panic(err)
//panic(err)
}
} else if connectionState == pionWebRTC.ICEConnectionStateConnected {
atomic.AddInt64(&peerConnectionCount, 1)
} else if connectionState == pionWebRTC.ICEConnectionStateChecking {
// Iterate over the candidates and send them to the remote client
// Non blocking channel
for candidate := range candidates {
for candidate := range CandidateArrays[sessionKey] {
log.Log.Info("InitializeWebRTCConnection: Received candidate.")
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil {
log.Log.Error("InitializeWebRTCConnection: something went wrong while adding candidate: " + candidateErr.Error())
@@ -188,29 +200,22 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
panic(err)
//panic(err)
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
//panic(err)
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
panic(err)
//panic(err)
}
// When an ICE candidate is available send to the other Pion instance
// the other Pion instance will add this candidate by calling AddICECandidate
var candidatesMux sync.Mutex
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
if candidate == nil {
return
}
candidatesMux.Lock()
defer candidatesMux.Unlock()
// Create a config map
valueMap := make(map[string]interface{})
candateJSON := candidate.ToJSON()