mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-03 11:50:10 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e74facfb7f | ||
|
|
54bc1989f9 | ||
|
|
94b71a0868 | ||
|
|
c071057eec | ||
|
|
e8a355d992 | ||
|
|
ca84664071 | ||
|
|
dd7fcb31b1 |
@@ -2,7 +2,7 @@ module github.com/kerberos-io/agent/machinery
|
||||
|
||||
go 1.19
|
||||
|
||||
//replace github.com/kerberos-io/joy4 v1.0.58 => ../../../../github.com/kerberos-io/joy4
|
||||
//replace github.com/kerberos-io/joy4 v1.0.63 => ../../../../github.com/kerberos-io/joy4
|
||||
|
||||
// replace github.com/kerberos-io/onvif v0.0.6 => ../../../../github.com/kerberos-io/onvif
|
||||
|
||||
@@ -26,7 +26,7 @@ require (
|
||||
github.com/golang-module/carbon/v2 v2.2.3
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kellydunn/golang-geo v0.7.0
|
||||
github.com/kerberos-io/joy4 v1.0.60
|
||||
github.com/kerberos-io/joy4 v1.0.64
|
||||
github.com/kerberos-io/onvif v0.0.7
|
||||
github.com/minio/minio-go/v6 v6.0.57
|
||||
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
|
||||
@@ -38,6 +38,7 @@ require (
|
||||
github.com/swaggo/gin-swagger v1.5.3
|
||||
github.com/swaggo/swag v1.8.9
|
||||
github.com/tevino/abool v1.2.0
|
||||
github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359
|
||||
go.mongodb.org/mongo-driver v1.7.5
|
||||
gopkg.in/DataDog/dd-trace-go.v1 v1.46.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
|
||||
@@ -264,8 +264,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
|
||||
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
|
||||
github.com/kerberos-io/joy4 v1.0.60 h1:W9LMTHw+Lgz4J9/28xCvvVebhcAioup49NqxYVmrH38=
|
||||
github.com/kerberos-io/joy4 v1.0.60/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/joy4 v1.0.64 h1:gTUSotHSOhp9mNqEecgq88tQHvpj7TjmrvPUsPm0idg=
|
||||
github.com/kerberos-io/joy4 v1.0.64/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
|
||||
github.com/kerberos-io/onvif v0.0.7 h1:LIrXjTH7G2W9DN69xZeJSB0uS3W1+C3huFO8kTqx7/A=
|
||||
github.com/kerberos-io/onvif v0.0.7/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
@@ -471,6 +471,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359 h1:P9yeMx2iNJxJqXEwLtMjSwWcD2a0AlFmFByeosMZhLM=
|
||||
github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359/go.mod h1:ySLGJD8AQluMQuu5JDvfJrwsBra+8iX1jFsKS8KfB2I=
|
||||
github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs=
|
||||
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
|
||||
go.mongodb.org/mongo-driver v1.7.5 h1:ny3p0reEpgsR2cfA5cjgwFZg3Cv/ofFh/8jbhGtz9VI=
|
||||
|
||||
@@ -16,12 +16,27 @@ import (
|
||||
"github.com/kerberos-io/joy4/format"
|
||||
)
|
||||
|
||||
func OpenRTSP(ctx context.Context, url string) (av.DemuxCloser, []av.CodecData, error) {
|
||||
func OpenRTSP(ctx context.Context, url string, withBackChannel bool) (av.DemuxCloser, []av.CodecData, error) {
|
||||
format.RegisterAll()
|
||||
infile, err := avutil.Open(ctx, url)
|
||||
|
||||
// Try with backchannel first (if variable is set to true)
|
||||
// If set to true, it will try to open the stream with a backchannel
|
||||
// If fails we will try again (see below).
|
||||
infile, err := avutil.Open(ctx, url, withBackChannel)
|
||||
if err == nil {
|
||||
streams, errstreams := infile.Streams()
|
||||
return infile, streams, errstreams
|
||||
if len(streams) > 0 {
|
||||
return infile, streams, errstreams
|
||||
} else {
|
||||
// Try again without backchannel
|
||||
log.Log.Info("OpenRTSP: trying without backchannel")
|
||||
withBackChannel = false
|
||||
infile, err := avutil.Open(ctx, url, withBackChannel)
|
||||
if err == nil {
|
||||
streams, errstreams := infile.Streams()
|
||||
return infile, streams, errstreams
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, []av.CodecData{}, err
|
||||
}
|
||||
|
||||
@@ -469,7 +469,7 @@ func VerifyCamera(c *gin.Context) {
|
||||
if streamType == "secondary" {
|
||||
rtspUrl = cameraStreams.SubRTSP
|
||||
}
|
||||
_, codecs, err := OpenRTSP(ctx, rtspUrl)
|
||||
_, codecs, err := OpenRTSP(ctx, rtspUrl, true)
|
||||
if err == nil {
|
||||
|
||||
videoIdx := -1
|
||||
|
||||
@@ -278,6 +278,11 @@ loop:
|
||||
cameraConnected = "false"
|
||||
}
|
||||
|
||||
hasBackChannel := "false"
|
||||
if communication.HasBackChannel {
|
||||
hasBackChannel = "true"
|
||||
}
|
||||
|
||||
// We will formated the uptime to a human readable format
|
||||
// this will be used on Kerberos Hub: Uptime -> 1 day and 2 hours.
|
||||
uptimeFormatted := uptimeStart.Format("2006-01-02 15:04:05")
|
||||
@@ -382,13 +387,14 @@ loop:
|
||||
"onvif_presets": "%s",
|
||||
"onvif_presets_list": %s,
|
||||
"cameraConnected": "%s",
|
||||
"hasBackChannel": "%s",
|
||||
"numberoffiles" : "33",
|
||||
"timestamp" : 1564747908,
|
||||
"cameratype" : "IPCamera",
|
||||
"docker" : true,
|
||||
"kios" : false,
|
||||
"raspberrypi" : false
|
||||
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled, onvifZoom, onvifPanTilt, onvifPresets, onvifPresetsList, cameraConnected)
|
||||
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled, onvifZoom, onvifPanTilt, onvifPresets, onvifPresetsList, cameraConnected, hasBackChannel)
|
||||
|
||||
var jsonStr = []byte(object)
|
||||
buffy := bytes.NewBuffer(jsonStr)
|
||||
@@ -595,14 +601,7 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod
|
||||
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
|
||||
for handshake := range communication.HandleLiveHDHandshake {
|
||||
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
|
||||
key := config.Key + "/" + handshake.SessionID
|
||||
webrtc.CandidatesMutex.Lock()
|
||||
_, ok := webrtc.CandidateArrays[key]
|
||||
if !ok {
|
||||
webrtc.CandidateArrays[key] = make(chan string)
|
||||
}
|
||||
webrtc.CandidatesMutex.Unlock()
|
||||
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake, webrtc.CandidateArrays[key])
|
||||
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
80
machinery/src/components/Audio.go
Normal file
80
machinery/src/components/Audio.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package components
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/log"
|
||||
"github.com/kerberos-io/agent/machinery/src/models"
|
||||
"github.com/kerberos-io/joy4/av"
|
||||
"github.com/zaf/g711"
|
||||
)
|
||||
|
||||
func GetBackChannelAudioCodec(streams []av.CodecData, communication *models.Communication) av.AudioCodecData {
|
||||
for _, stream := range streams {
|
||||
if stream.Type().IsAudio() {
|
||||
if stream.Type().String() == "PCM_MULAW" {
|
||||
pcmuCodec := stream.(av.AudioCodecData)
|
||||
if pcmuCodec.IsBackChannel() {
|
||||
communication.HasBackChannel = true
|
||||
return pcmuCodec
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func WriteAudioToBackchannel(infile av.DemuxCloser, streams []av.CodecData, communication *models.Communication) {
|
||||
log.Log.Info("WriteAudioToBackchannel: looking for backchannel audio codec")
|
||||
|
||||
pcmuCodec := GetBackChannelAudioCodec(streams, communication)
|
||||
if pcmuCodec != nil {
|
||||
log.Log.Info("WriteAudioToBackchannel: found backchannel audio codec")
|
||||
|
||||
length := 0
|
||||
channel := pcmuCodec.GetIndex() * 2 // This is the same calculation as Interleaved property in the SDP file.
|
||||
for audio := range communication.HandleAudio {
|
||||
// Encode PCM to MULAW
|
||||
var bufferUlaw []byte
|
||||
for _, v := range audio.Data {
|
||||
b := g711.EncodeUlawFrame(v)
|
||||
bufferUlaw = append(bufferUlaw, b)
|
||||
}
|
||||
infile.Write(bufferUlaw, channel, uint32(length))
|
||||
length = (length + len(bufferUlaw)) % 65536
|
||||
time.Sleep(128 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
log.Log.Info("WriteAudioToBackchannel: finished")
|
||||
|
||||
}
|
||||
|
||||
func WriteFileToBackChannel(infile av.DemuxCloser) {
|
||||
// Do the warmup!
|
||||
file, err := os.Open("./audiofile.bye")
|
||||
if err != nil {
|
||||
fmt.Println("WriteFileToBackChannel: error opening audiofile.bye file")
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Read file into buffer
|
||||
reader := bufio.NewReader(file)
|
||||
buffer := make([]byte, 1024)
|
||||
|
||||
count := 0
|
||||
for {
|
||||
_, err := reader.Read(buffer)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
// Send to backchannel
|
||||
fmt.Println(buffer)
|
||||
infile.Write(buffer, 2, uint32(count))
|
||||
|
||||
count = count + 1024
|
||||
time.Sleep(128 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/kerberos-io/joy4/cgo/ffmpeg"
|
||||
|
||||
//"github.com/youpy/go-wav"
|
||||
|
||||
"github.com/kerberos-io/agent/machinery/src/capture"
|
||||
"github.com/kerberos-io/agent/machinery/src/cloud"
|
||||
"github.com/kerberos-io/agent/machinery/src/computervision"
|
||||
@@ -116,9 +118,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
status := "not started"
|
||||
|
||||
// Currently only support H264 encoded cameras, this will change.
|
||||
// Establishing the camera connection
|
||||
// Establishing the camera connection without backchannel if no substream
|
||||
rtspUrl := config.Capture.IPCamera.RTSP
|
||||
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl)
|
||||
withBackChannel := true
|
||||
infile, streams, err := capture.OpenRTSP(context.Background(), rtspUrl, withBackChannel)
|
||||
|
||||
// We will initialise the camera settings object
|
||||
// so we can check if the camera settings have changed, and we need
|
||||
@@ -155,7 +158,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
subStreamEnabled := false
|
||||
subRtspUrl := config.Capture.IPCamera.SubRTSP
|
||||
if subRtspUrl != "" && subRtspUrl != rtspUrl {
|
||||
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl)
|
||||
withBackChannel := false
|
||||
subInfile, subStreams, err = capture.OpenRTSP(context.Background(), subRtspUrl, withBackChannel) // We'll try to enable backchannel for the substream.
|
||||
if err == nil {
|
||||
log.Log.Info("RunAgent: opened RTSP sub stream " + subRtspUrl)
|
||||
subStreamEnabled = true
|
||||
@@ -244,6 +248,9 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
go capture.HandleSubStream(subInfile, subQueue, communication)
|
||||
}
|
||||
|
||||
// Handle processing of audio
|
||||
communication.HandleAudio = make(chan models.AudioDataPartial)
|
||||
|
||||
// Handle processing of motion
|
||||
communication.HandleMotion = make(chan models.MotionDataPartial, 1)
|
||||
if subStreamEnabled {
|
||||
@@ -285,6 +292,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
// If we reach this point, we have a working RTSP connection.
|
||||
communication.CameraConnected = true
|
||||
|
||||
// We might have a camera with audio backchannel enabled.
|
||||
// Check if we have a stream with a backchannel and is PCMU encoded.
|
||||
go WriteAudioToBackchannel(infile, streams, communication)
|
||||
|
||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
// This will go into a blocking state, once this channel is triggered
|
||||
// the agent will cleanup and restart.
|
||||
@@ -328,6 +339,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
|
||||
}
|
||||
close(communication.HandleMotion)
|
||||
communication.HandleMotion = nil
|
||||
close(communication.HandleAudio)
|
||||
communication.HandleAudio = nil
|
||||
|
||||
// Waiting for some seconds to make sure everything is properly closed.
|
||||
log.Log.Info("RunAgent: waiting 3 seconds to make sure everything is properly closed.")
|
||||
|
||||
6
machinery/src/models/AudioData.go
Normal file
6
machinery/src/models/AudioData.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package models
|
||||
|
||||
type AudioDataPartial struct {
|
||||
Timestamp int64 `json:"timestamp" bson:"timestamp"`
|
||||
Data []int16 `json:"data" bson:"data"`
|
||||
}
|
||||
@@ -22,6 +22,7 @@ type Communication struct {
|
||||
HandleStream chan string
|
||||
HandleSubStream chan string
|
||||
HandleMotion chan MotionDataPartial
|
||||
HandleAudio chan AudioDataPartial
|
||||
HandleUpload chan string
|
||||
HandleHeartBeat chan string
|
||||
HandleLiveSD chan int64
|
||||
@@ -38,4 +39,5 @@ type Communication struct {
|
||||
SubDecoder *ffmpeg.VideoDecoder
|
||||
Image string
|
||||
CameraConnected bool
|
||||
HasBackChannel bool
|
||||
}
|
||||
|
||||
@@ -107,6 +107,12 @@ type Payload struct {
|
||||
Value map[string]interface{} `json:"value"`
|
||||
}
|
||||
|
||||
// We received a audio input
|
||||
type AudioPayload struct {
|
||||
Timestamp int64 `json:"timestamp"` // timestamp of the recording request.
|
||||
Data []int16 `json:"data"`
|
||||
}
|
||||
|
||||
// We received a recording request, we'll send it to the motion handler.
|
||||
type RecordPayload struct {
|
||||
Timestamp int64 `json:"timestamp"` // timestamp of the recording request.
|
||||
|
||||
@@ -751,24 +751,6 @@ func GoToPresetFromDevice(device *onvif.Device, presetName string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func getXMLNode(xmlBody string, nodeName string) (*xml.Decoder, *xml.StartElement, error) {
|
||||
xmlBytes := bytes.NewBufferString(xmlBody)
|
||||
decodedXML := xml.NewDecoder(xmlBytes)
|
||||
for {
|
||||
token, err := decodedXML.Token()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
switch et := token.(type) {
|
||||
case xml.StartElement:
|
||||
if et.Name.Local == nodeName {
|
||||
return decodedXML, &et, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil, errors.New("error in NodeName - username and password might be wrong")
|
||||
}
|
||||
|
||||
func GetPTZFunctionsFromDevice(configurations ptz.GetConfigurationsResponse) ([]string, bool, bool) {
|
||||
var functions []string
|
||||
canZoom := false
|
||||
@@ -854,3 +836,21 @@ func VerifyOnvifConnection(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func getXMLNode(xmlBody string, nodeName string) (*xml.Decoder, *xml.StartElement, error) {
|
||||
xmlBytes := bytes.NewBufferString(xmlBody)
|
||||
decodedXML := xml.NewDecoder(xmlBytes)
|
||||
for {
|
||||
token, err := decodedXML.Token()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
switch et := token.(type) {
|
||||
case xml.StartElement:
|
||||
if et.Name.Local == nodeName {
|
||||
return decodedXML, &et, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil, errors.New("error in NodeName - username and password might be wrong")
|
||||
}
|
||||
|
||||
@@ -229,6 +229,8 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
|
||||
switch payload.Action {
|
||||
case "record":
|
||||
go HandleRecording(mqttClient, hubKey, payload, configuration, communication)
|
||||
case "get-audio-backchannel":
|
||||
go HandleAudio(mqttClient, hubKey, payload, configuration, communication)
|
||||
case "get-ptz-position":
|
||||
go HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication)
|
||||
case "update-ptz-position":
|
||||
@@ -268,6 +270,23 @@ func HandleRecording(mqttClient mqtt.Client, hubKey string, payload models.Paylo
|
||||
}
|
||||
}
|
||||
|
||||
func HandleAudio(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) {
|
||||
value := payload.Value
|
||||
|
||||
// Convert map[string]interface{} to AudioPayload
|
||||
jsonData, _ := json.Marshal(value)
|
||||
var audioPayload models.AudioPayload
|
||||
json.Unmarshal(jsonData, &audioPayload)
|
||||
|
||||
if audioPayload.Timestamp != 0 {
|
||||
audioDataPartial := models.AudioDataPartial{
|
||||
Timestamp: audioPayload.Timestamp,
|
||||
Data: audioPayload.Data,
|
||||
}
|
||||
communication.HandleAudio <- audioDataPartial
|
||||
}
|
||||
}
|
||||
|
||||
func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) {
|
||||
value := payload.Value
|
||||
|
||||
@@ -457,14 +476,9 @@ func HandleReceiveHDCandidates(mqttClient mqtt.Client, hubKey string, payload mo
|
||||
|
||||
if receiveHDCandidatesPayload.Timestamp != 0 {
|
||||
if communication.CameraConnected {
|
||||
// Register candidate channel
|
||||
key := configuration.Config.Key + "/" + receiveHDCandidatesPayload.SessionID
|
||||
channel := webrtc.CandidateArrays[key]
|
||||
if channel == nil {
|
||||
channel = make(chan string)
|
||||
webrtc.CandidateArrays[key] = channel
|
||||
}
|
||||
log.Log.Info("HandleReceiveHDCandidates: " + receiveHDCandidatesPayload.Candidate)
|
||||
channel <- receiveHDCandidatesPayload.Candidate
|
||||
go webrtc.RegisterCandidates(key, receiveHDCandidatesPayload)
|
||||
} else {
|
||||
log.Log.Info("HandleReceiveHDCandidates: received candidate, but camera is not connected.")
|
||||
}
|
||||
|
||||
@@ -87,7 +87,23 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
|
||||
return offer
|
||||
}
|
||||
|
||||
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload, candidates chan string) {
|
||||
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
|
||||
// Set lock
|
||||
CandidatesMutex.Lock()
|
||||
_, ok := CandidateArrays[key]
|
||||
if !ok {
|
||||
CandidateArrays[key] = make(chan string)
|
||||
}
|
||||
log.Log.Info("HandleReceiveHDCandidates: " + candidate.Candidate)
|
||||
select {
|
||||
case CandidateArrays[key] <- candidate.Candidate:
|
||||
default:
|
||||
log.Log.Info("HandleReceiveHDCandidates: channel is full.")
|
||||
}
|
||||
CandidatesMutex.Unlock()
|
||||
}
|
||||
|
||||
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {
|
||||
|
||||
config := configuration.Config
|
||||
deviceKey := config.Key
|
||||
@@ -96,6 +112,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
turnServersUsername := config.TURNUsername
|
||||
turnServersCredential := config.TURNPassword
|
||||
|
||||
// We create a channel which will hold the candidates for this session.
|
||||
sessionKey := config.Key + "/" + handshake.SessionID
|
||||
CandidatesMutex.Lock()
|
||||
_, ok := CandidateArrays[sessionKey]
|
||||
if !ok {
|
||||
CandidateArrays[sessionKey] = make(chan string)
|
||||
}
|
||||
CandidatesMutex.Unlock()
|
||||
|
||||
// Set variables
|
||||
hubKey := handshake.HubKey
|
||||
sessionDescription := handshake.SessionDescription
|
||||
@@ -132,37 +157,47 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
if err == nil && peerConnection != nil {
|
||||
|
||||
if _, err = peerConnection.AddTrack(videoTrack); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
if _, err = peerConnection.AddTrack(audioTrack); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
peerConnection.OnICEConnectionStateChange(func(connectionState pionWebRTC.ICEConnectionState) {
|
||||
if connectionState == pionWebRTC.ICEConnectionStateDisconnected {
|
||||
atomic.AddInt64(&peerConnectionCount, -1)
|
||||
|
||||
// Set lock
|
||||
CandidatesMutex.Lock()
|
||||
peerConnections[handshake.SessionID] = nil
|
||||
close(candidates)
|
||||
_, ok := CandidateArrays[sessionKey]
|
||||
if ok {
|
||||
close(CandidateArrays[sessionKey])
|
||||
}
|
||||
CandidatesMutex.Unlock()
|
||||
|
||||
close(w.PacketsCount)
|
||||
if err := peerConnection.Close(); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
} else if connectionState == pionWebRTC.ICEConnectionStateConnected {
|
||||
atomic.AddInt64(&peerConnectionCount, 1)
|
||||
} else if connectionState == pionWebRTC.ICEConnectionStateChecking {
|
||||
// Iterate over the candidates and send them to the remote client
|
||||
// Non blocking channel
|
||||
for candidate := range candidates {
|
||||
for candidate := range CandidateArrays[sessionKey] {
|
||||
log.Log.Info("InitializeWebRTCConnection: Received candidate.")
|
||||
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil {
|
||||
log.Log.Error("InitializeWebRTCConnection: something went wrong while adding candidate: " + candidateErr.Error())
|
||||
}
|
||||
}
|
||||
} else if connectionState == pionWebRTC.ICEConnectionStateFailed {
|
||||
|
||||
}
|
||||
log.Log.Info("InitializeWebRTCConnection: connection state changed to: " + connectionState.String())
|
||||
log.Log.Info("InitializeWebRTCConnection: Number of peers connected (" + strconv.FormatInt(peerConnectionCount, 10) + ")")
|
||||
@@ -170,29 +205,22 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
|
||||
offer := w.CreateOffer(sd)
|
||||
if err = peerConnection.SetRemoteDescription(offer); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
answer, err := peerConnection.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
|
||||
// When an ICE candidate is available send to the other Pion instance
|
||||
// the other Pion instance will add this candidate by calling AddICECandidate
|
||||
var candidatesMux sync.Mutex
|
||||
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
|
||||
// The other peer will add this candidate by calling AddICECandidate
|
||||
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
|
||||
if candidate == nil {
|
||||
return
|
||||
}
|
||||
|
||||
candidatesMux.Lock()
|
||||
defer candidatesMux.Unlock()
|
||||
|
||||
// Create a config map
|
||||
valueMap := make(map[string]interface{})
|
||||
candateJSON := candidate.ToJSON()
|
||||
|
||||
Reference in New Issue
Block a user