mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-04 10:50:27 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
54bc1989f9 | ||
|
|
94b71a0868 | ||
|
|
c071057eec |
@@ -2,7 +2,7 @@ module github.com/kerberos-io/agent/machinery
|
||||
|
||||
go 1.19
|
||||
|
||||
//replace github.com/kerberos-io/joy4 v1.0.60 => ../../../../github.com/kerberos-io/joy4
|
||||
//replace github.com/kerberos-io/joy4 v1.0.63 => ../../../../github.com/kerberos-io/joy4
|
||||
|
||||
// replace github.com/kerberos-io/onvif v0.0.6 => ../../../../github.com/kerberos-io/onvif
|
||||
|
||||
@@ -26,7 +26,7 @@ require (
|
||||
github.com/golang-module/carbon/v2 v2.2.3
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kellydunn/golang-geo v0.7.0
|
||||
github.com/kerberos-io/joy4 v1.0.63
|
||||
github.com/kerberos-io/joy4 v1.0.64
|
||||
github.com/kerberos-io/onvif v0.0.7
|
||||
github.com/minio/minio-go/v6 v6.0.57
|
||||
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
|
||||
|
||||
@@ -264,8 +264,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
|
||||
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
|
||||
github.com/kerberos-io/joy4 v1.0.63 h1:1VucVzE+WojpWWVKxpVpjmnArnZ2XIQSqn9Sldm6PrE=
|
||||
github.com/kerberos-io/joy4 v1.0.63/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/joy4 v1.0.64 h1:gTUSotHSOhp9mNqEecgq88tQHvpj7TjmrvPUsPm0idg=
|
||||
github.com/kerberos-io/joy4 v1.0.64/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/onvif v0.0.7 h1:LIrXjTH7G2W9DN69xZeJSB0uS3W1+C3huFO8kTqx7/A=
|
||||
github.com/kerberos-io/onvif v0.0.7/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
|
||||
@@ -16,12 +16,27 @@ import (
|
||||
"github.com/kerberos-io/joy4/format"
|
||||
)
|
||||
|
||||
func OpenRTSP(ctx context.Context, url string) (av.DemuxCloser, []av.CodecData, error) {
|
||||
func OpenRTSP(ctx context.Context, url string, withBackChannel bool) (av.DemuxCloser, []av.CodecData, error) {
|
||||
format.RegisterAll()
|
||||
infile, err := avutil.Open(ctx, url)
|
||||
|
||||
// Try with backchannel first (if variable is set to true)
|
||||
// If set to true, it will try to open the stream with a backchannel
|
||||
// If fails we will try again (see below).
|
||||
infile, err := avutil.Open(ctx, url, withBackChannel)
|
||||
if err == nil {
|
||||
streams, errstreams := infile.Streams()
|
||||
return infile, streams, errstreams
|
||||
if len(streams) > 0 {
|
||||
return infile, streams, errstreams
|
||||
} else {
|
||||
// Try again without backchannel
|
||||
log.Log.Info("OpenRTSP: trying without backchannel")
|
||||
withBackChannel = false
|
||||
infile, err := avutil.Open(ctx, url, withBackChannel)
|
||||
if err == nil {
|
||||
streams, errstreams := infile.Streams()
|
||||
return infile, streams, errstreams
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, []av.CodecData{}, err
|
||||
}
|
||||
|
||||
@@ -469,7 +469,7 @@ func VerifyCamera(c *gin.Context) {
|
||||
if streamType == "secondary" {
|
||||
rtspUrl = cameraStreams.SubRTSP
|
||||
}
|
||||
_, codecs, err := OpenRTSP(ctx, rtspUrl)
|
||||
_, codecs, err := OpenRTSP(ctx, rtspUrl, true)
|
||||
if err == nil {
|
||||
|
||||
videoIdx := -1
|
||||
|
||||
@@ -601,14 +601,7 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod
|
||||
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
|
||||
for handshake := range communication.HandleLiveHDHandshake {
|
||||
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
|
||||
key := config.Key + "/" + handshake.SessionID
|
||||
webrtc.CandidatesMutex.Lock()
|
||||
_, ok := webrtc.CandidateArrays[key]
|
||||
if !ok {
|
||||
webrtc.CandidateArrays[key] = make(chan string)
|
||||
}
|
||||
webrtc.CandidatesMutex.Unlock()
|
||||
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake, webrtc.CandidateArrays[key])
|
||||
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,9 +118,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
status := "not started"
|
||||
|
||||
// Currently only support H264 encoded cameras, this will change.
|
||||
// Establishing the camera connection
|
||||
// Establishing the camera connection without backchannel if no substream
|
||||
rtspUrl := config.Capture.IPCamera.RTSP
|
||||
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl)
|
||||
withBackChannel := true
|
||||
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl, withBackChannel)
|
||||
|
||||
// We will initialise the camera settings object
|
||||
// so we can check if the camera settings have changed, and we need
|
||||
@@ -157,7 +158,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
subStreamEnabled := false
|
||||
subRtspUrl := config.Capture.IPCamera.SubRTSP
|
||||
if subRtspUrl != "" && subRtspUrl != rtspUrl {
|
||||
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl)
|
||||
withBackChannel := false
|
||||
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl, withBackChannel) // We'll try to enable backchannel for the substream.
|
||||
if err == nil {
|
||||
log.Log.Info("RunAgent: opened RTSP sub stream " + subRtspUrl)
|
||||
subStreamEnabled = true
|
||||
|
||||
@@ -751,24 +751,6 @@ func GoToPresetFromDevice(device *onvif.Device, presetName string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func getXMLNode(xmlBody string, nodeName string) (*xml.Decoder, *xml.StartElement, error) {
|
||||
xmlBytes := bytes.NewBufferString(xmlBody)
|
||||
decodedXML := xml.NewDecoder(xmlBytes)
|
||||
for {
|
||||
token, err := decodedXML.Token()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
switch et := token.(type) {
|
||||
case xml.StartElement:
|
||||
if et.Name.Local == nodeName {
|
||||
return decodedXML, &et, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil, errors.New("error in NodeName - username and password might be wrong")
|
||||
}
|
||||
|
||||
func GetPTZFunctionsFromDevice(configurations ptz.GetConfigurationsResponse) ([]string, bool, bool) {
|
||||
var functions []string
|
||||
canZoom := false
|
||||
@@ -854,3 +836,21 @@ func VerifyOnvifConnection(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func getXMLNode(xmlBody string, nodeName string) (*xml.Decoder, *xml.StartElement, error) {
|
||||
xmlBytes := bytes.NewBufferString(xmlBody)
|
||||
decodedXML := xml.NewDecoder(xmlBytes)
|
||||
for {
|
||||
token, err := decodedXML.Token()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
switch et := token.(type) {
|
||||
case xml.StartElement:
|
||||
if et.Name.Local == nodeName {
|
||||
return decodedXML, &et, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil, errors.New("error in NodeName - username and password might be wrong")
|
||||
}
|
||||
|
||||
@@ -478,7 +478,7 @@ func HandleReceiveHDCandidates(mqttClient mqtt.Client, hubKey string, payload mo
|
||||
if communication.CameraConnected {
|
||||
// Register candidate channel
|
||||
key := configuration.Config.Key + "/" + receiveHDCandidatesPayload.SessionID
|
||||
webrtc.RegisterCandidates(key, receiveHDCandidatesPayload)
|
||||
go webrtc.RegisterCandidates(key, receiveHDCandidatesPayload)
|
||||
} else {
|
||||
log.Log.Info("HandleReceiveHDCandidates: received candidate, but camera is not connected.")
|
||||
}
|
||||
|
||||
@@ -88,11 +88,8 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
|
||||
}
|
||||
|
||||
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
|
||||
|
||||
// Set lock
|
||||
CandidatesMutex.Lock()
|
||||
defer CandidatesMutex.Unlock()
|
||||
|
||||
channel := CandidateArrays[key]
|
||||
if channel == nil {
|
||||
channel = make(chan string)
|
||||
@@ -100,9 +97,10 @@ func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload)
|
||||
}
|
||||
log.Log.Info("HandleReceiveHDCandidates: " + candidate.Candidate)
|
||||
channel <- candidate.Candidate
|
||||
CandidatesMutex.Unlock()
|
||||
}
|
||||
|
||||
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload, candidates chan string) {
|
||||
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {
|
||||
|
||||
config := configuration.Config
|
||||
deviceKey := config.Key
|
||||
@@ -111,6 +109,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
turnServersUsername := config.TURNUsername
|
||||
turnServersCredential := config.TURNPassword
|
||||
|
||||
// We create a channel which will hold the candidates for this session.
|
||||
sessionKey := config.Key + "/" + handshake.SessionID
|
||||
CandidatesMutex.Lock()
|
||||
_, ok := CandidateArrays[sessionKey]
|
||||
if !ok {
|
||||
CandidateArrays[sessionKey] = make(chan string)
|
||||
}
|
||||
CandidatesMutex.Unlock()
|
||||
|
||||
// Set variables
|
||||
hubKey := handshake.HubKey
|
||||
sessionDescription := handshake.SessionDescription
|
||||
@@ -147,35 +154,40 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
if err == nil && peerConnection != nil {
|
||||
|
||||
if _, err = peerConnection.AddTrack(videoTrack); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
if _, err = peerConnection.AddTrack(audioTrack); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
peerConnection.OnICEConnectionStateChange(func(connectionState pionWebRTC.ICEConnectionState) {
|
||||
if connectionState == pionWebRTC.ICEConnectionStateDisconnected {
|
||||
CandidatesMutex.Lock()
|
||||
defer CandidatesMutex.Unlock()
|
||||
|
||||
atomic.AddInt64(&peerConnectionCount, -1)
|
||||
|
||||
// Set lock
|
||||
CandidatesMutex.Lock()
|
||||
peerConnections[handshake.SessionID] = nil
|
||||
close(candidates)
|
||||
_, ok := CandidateArrays[sessionKey]
|
||||
if ok {
|
||||
close(CandidateArrays[sessionKey])
|
||||
}
|
||||
CandidatesMutex.Unlock()
|
||||
|
||||
close(w.PacketsCount)
|
||||
if err := peerConnection.Close(); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
} else if connectionState == pionWebRTC.ICEConnectionStateConnected {
|
||||
atomic.AddInt64(&peerConnectionCount, 1)
|
||||
} else if connectionState == pionWebRTC.ICEConnectionStateChecking {
|
||||
// Iterate over the candidates and send them to the remote client
|
||||
// Non blocking channel
|
||||
for candidate := range candidates {
|
||||
for candidate := range CandidateArrays[sessionKey] {
|
||||
log.Log.Info("InitializeWebRTCConnection: Received candidate.")
|
||||
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil {
|
||||
log.Log.Error("InitializeWebRTCConnection: something went wrong while adding candidate: " + candidateErr.Error())
|
||||
@@ -188,29 +200,22 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
|
||||
offer := w.CreateOffer(sd)
|
||||
if err = peerConnection.SetRemoteDescription(offer); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
answer, err := peerConnection.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
// When an ICE candidate is available send to the other Pion instance
|
||||
// the other Pion instance will add this candidate by calling AddICECandidate
|
||||
var candidatesMux sync.Mutex
|
||||
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
|
||||
// The other peer will add this candidate by calling AddICECandidate
|
||||
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
|
||||
if candidate == nil {
|
||||
return
|
||||
}
|
||||
|
||||
candidatesMux.Lock()
|
||||
defer candidatesMux.Unlock()
|
||||
|
||||
// Create a config map
|
||||
valueMap := make(map[string]interface{})
|
||||
candateJSON := candidate.ToJSON()
|
||||
|
||||
Reference in New Issue
Block a user