|
|
|
|
@@ -1,6 +1,7 @@
|
|
|
|
|
package webrtc
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/base64"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"io"
|
|
|
|
|
@@ -22,13 +23,104 @@ import (
|
|
|
|
|
pionMedia "github.com/pion/webrtc/v4/pkg/media"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
CandidatesMutex sync.Mutex
|
|
|
|
|
CandidateArrays map[string](chan string)
|
|
|
|
|
peerConnectionCount int64
|
|
|
|
|
peerConnections map[string]*pionWebRTC.PeerConnection
|
|
|
|
|
const (
|
|
|
|
|
// Channel buffer sizes
|
|
|
|
|
candidateChannelBuffer = 100
|
|
|
|
|
rtcpBufferSize = 1500
|
|
|
|
|
|
|
|
|
|
// Timeouts and intervals
|
|
|
|
|
keepAliveTimeout = 15 * time.Second
|
|
|
|
|
defaultTimeout = 10 * time.Second
|
|
|
|
|
|
|
|
|
|
// Track identifiers
|
|
|
|
|
trackStreamID = "kerberos-stream"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// ConnectionManager manages WebRTC peer connections in a thread-safe manner
|
|
|
|
|
type ConnectionManager struct {
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
candidateChannels map[string]chan string
|
|
|
|
|
peerConnections map[string]*peerConnectionWrapper
|
|
|
|
|
peerConnectionCount int64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// peerConnectionWrapper wraps a peer connection with additional metadata
|
|
|
|
|
type peerConnectionWrapper struct {
|
|
|
|
|
conn *pionWebRTC.PeerConnection
|
|
|
|
|
cancelCtx context.CancelFunc
|
|
|
|
|
done chan struct{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var globalConnectionManager = NewConnectionManager()
|
|
|
|
|
|
|
|
|
|
// NewConnectionManager creates a new connection manager
|
|
|
|
|
func NewConnectionManager() *ConnectionManager {
|
|
|
|
|
return &ConnectionManager{
|
|
|
|
|
candidateChannels: make(map[string]chan string),
|
|
|
|
|
peerConnections: make(map[string]*peerConnectionWrapper),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetOrCreateCandidateChannel gets or creates a candidate channel for a session
|
|
|
|
|
func (cm *ConnectionManager) GetOrCreateCandidateChannel(sessionKey string) chan string {
|
|
|
|
|
cm.mu.Lock()
|
|
|
|
|
defer cm.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
if ch, exists := cm.candidateChannels[sessionKey]; exists {
|
|
|
|
|
return ch
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ch := make(chan string, candidateChannelBuffer)
|
|
|
|
|
cm.candidateChannels[sessionKey] = ch
|
|
|
|
|
return ch
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// CloseCandidateChannel safely closes and removes a candidate channel
|
|
|
|
|
func (cm *ConnectionManager) CloseCandidateChannel(sessionKey string) {
|
|
|
|
|
cm.mu.Lock()
|
|
|
|
|
defer cm.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
if ch, exists := cm.candidateChannels[sessionKey]; exists {
|
|
|
|
|
close(ch)
|
|
|
|
|
delete(cm.candidateChannels, sessionKey)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// AddPeerConnection adds a peer connection to the manager
|
|
|
|
|
func (cm *ConnectionManager) AddPeerConnection(sessionID string, wrapper *peerConnectionWrapper) {
|
|
|
|
|
cm.mu.Lock()
|
|
|
|
|
defer cm.mu.Unlock()
|
|
|
|
|
cm.peerConnections[sessionID] = wrapper
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RemovePeerConnection removes a peer connection from the manager
|
|
|
|
|
func (cm *ConnectionManager) RemovePeerConnection(sessionID string) {
|
|
|
|
|
cm.mu.Lock()
|
|
|
|
|
defer cm.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
if wrapper, exists := cm.peerConnections[sessionID]; exists {
|
|
|
|
|
if wrapper.cancelCtx != nil {
|
|
|
|
|
wrapper.cancelCtx()
|
|
|
|
|
}
|
|
|
|
|
delete(cm.peerConnections, sessionID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetPeerConnectionCount returns the current count of active peer connections
|
|
|
|
|
func (cm *ConnectionManager) GetPeerConnectionCount() int64 {
|
|
|
|
|
return atomic.LoadInt64(&cm.peerConnectionCount)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// IncrementPeerCount atomically increments the peer connection count
|
|
|
|
|
func (cm *ConnectionManager) IncrementPeerCount() int64 {
|
|
|
|
|
return atomic.AddInt64(&cm.peerConnectionCount, 1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DecrementPeerCount atomically decrements the peer connection count
|
|
|
|
|
func (cm *ConnectionManager) DecrementPeerCount() int64 {
|
|
|
|
|
return atomic.AddInt64(&cm.peerConnectionCount, -1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type WebRTC struct {
|
|
|
|
|
Name string
|
|
|
|
|
StunServers []string
|
|
|
|
|
@@ -46,7 +138,7 @@ func CreateWebRTC(name string, stunServers []string, turnServers []string, turnS
|
|
|
|
|
TurnServers: turnServers,
|
|
|
|
|
TurnServersUsername: turnServersUsername,
|
|
|
|
|
TurnServersCredential: turnServersCredential,
|
|
|
|
|
Timer: time.NewTimer(time.Second * 10),
|
|
|
|
|
Timer: time.NewTimer(defaultTimeout),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -68,19 +160,14 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
|
|
|
|
|
// Set lock
|
|
|
|
|
CandidatesMutex.Lock()
|
|
|
|
|
_, ok := CandidateArrays[key]
|
|
|
|
|
if !ok {
|
|
|
|
|
CandidateArrays[key] = make(chan string, 100)
|
|
|
|
|
}
|
|
|
|
|
log.Log.Info("webrtc.main.HandleReceiveHDCandidates(): " + candidate.Candidate)
|
|
|
|
|
ch := globalConnectionManager.GetOrCreateCandidateChannel(key)
|
|
|
|
|
|
|
|
|
|
log.Log.Info("webrtc.main.RegisterCandidates(): " + candidate.Candidate)
|
|
|
|
|
select {
|
|
|
|
|
case CandidateArrays[key] <- candidate.Candidate:
|
|
|
|
|
case ch <- candidate.Candidate:
|
|
|
|
|
default:
|
|
|
|
|
log.Log.Info("webrtc.main.HandleReceiveHDCandidates(): channel is full.")
|
|
|
|
|
log.Log.Info("webrtc.main.RegisterCandidates(): channel is full, dropping candidate")
|
|
|
|
|
}
|
|
|
|
|
CandidatesMutex.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, interceptorRegistry *interceptor.Registry) error {
|
|
|
|
|
@@ -107,12 +194,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
|
|
|
|
|
|
|
|
|
// We create a channel which will hold the candidates for this session.
|
|
|
|
|
sessionKey := config.Key + "/" + handshake.SessionID
|
|
|
|
|
CandidatesMutex.Lock()
|
|
|
|
|
_, ok := CandidateArrays[sessionKey]
|
|
|
|
|
if !ok {
|
|
|
|
|
CandidateArrays[sessionKey] = make(chan string, 100)
|
|
|
|
|
}
|
|
|
|
|
CandidatesMutex.Unlock()
|
|
|
|
|
candidateChannel := globalConnectionManager.GetOrCreateCandidateChannel(sessionKey)
|
|
|
|
|
|
|
|
|
|
// Set variables
|
|
|
|
|
hubKey := handshake.HubKey
|
|
|
|
|
@@ -178,81 +260,126 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
|
|
|
|
|
|
|
|
|
if err == nil && peerConnection != nil {
|
|
|
|
|
|
|
|
|
|
var videoSender *pionWebRTC.RTPSender = nil
|
|
|
|
|
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while adding video track: " + err.Error())
|
|
|
|
|
// Create context for this connection
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
wrapper := &peerConnectionWrapper{
|
|
|
|
|
conn: peerConnection,
|
|
|
|
|
cancelCtx: cancel,
|
|
|
|
|
done: make(chan struct{}),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var videoSender *pionWebRTC.RTPSender = nil
|
|
|
|
|
if videoTrack != nil {
|
|
|
|
|
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
|
|
|
|
|
cancel()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): video track is nil, skipping video")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read incoming RTCP packets
|
|
|
|
|
// Before these packets are returned they are processed by interceptors. For things
|
|
|
|
|
// like NACK this needs to be called.
|
|
|
|
|
go func() {
|
|
|
|
|
rtcpBuf := make([]byte, 1500)
|
|
|
|
|
for {
|
|
|
|
|
if _, _, rtcpErr := videoSender.Read(rtcpBuf); rtcpErr != nil {
|
|
|
|
|
return
|
|
|
|
|
if videoSender != nil {
|
|
|
|
|
go func() {
|
|
|
|
|
defer func() {
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): video RTCP reader stopped")
|
|
|
|
|
}()
|
|
|
|
|
rtcpBuf := make([]byte, rtcpBufferSize)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
if _, _, rtcpErr := videoSender.Read(rtcpBuf); rtcpErr != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var audioSender *pionWebRTC.RTPSender = nil
|
|
|
|
|
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while adding audio track: " + err.Error())
|
|
|
|
|
} // Read incoming RTCP packets
|
|
|
|
|
if audioTrack != nil {
|
|
|
|
|
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
|
|
|
|
|
cancel()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): audio track is nil, skipping audio")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read incoming RTCP packets
|
|
|
|
|
// Before these packets are returned they are processed by interceptors. For things
|
|
|
|
|
// like NACK this needs to be called.
|
|
|
|
|
go func() {
|
|
|
|
|
rtcpBuf := make([]byte, 1500)
|
|
|
|
|
for {
|
|
|
|
|
if _, _, rtcpErr := audioSender.Read(rtcpBuf); rtcpErr != nil {
|
|
|
|
|
return
|
|
|
|
|
if audioSender != nil {
|
|
|
|
|
go func() {
|
|
|
|
|
defer func() {
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): audio RTCP reader stopped")
|
|
|
|
|
}()
|
|
|
|
|
rtcpBuf := make([]byte, rtcpBufferSize)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
if _, _, rtcpErr := audioSender.Read(rtcpBuf); rtcpErr != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
|
|
|
|
|
if connectionState == pionWebRTC.PeerConnectionStateDisconnected || connectionState == pionWebRTC.PeerConnectionStateClosed {
|
|
|
|
|
// Set lock
|
|
|
|
|
CandidatesMutex.Lock()
|
|
|
|
|
atomic.AddInt64(&peerConnectionCount, -1)
|
|
|
|
|
_, ok := CandidateArrays[sessionKey]
|
|
|
|
|
if ok {
|
|
|
|
|
close(CandidateArrays[sessionKey])
|
|
|
|
|
delete(CandidateArrays, sessionKey)
|
|
|
|
|
}
|
|
|
|
|
// Not really needed.
|
|
|
|
|
//senders := peerConnection.GetSenders()
|
|
|
|
|
//for _, sender := range senders {
|
|
|
|
|
// if err := peerConnection.RemoveTrack(sender); err != nil {
|
|
|
|
|
// log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while removing track: " + err.Error())
|
|
|
|
|
// }
|
|
|
|
|
//}
|
|
|
|
|
if err := peerConnection.Close(); err != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while closing peer connection: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
peerConnections[handshake.SessionID] = nil
|
|
|
|
|
delete(peerConnections, handshake.SessionID)
|
|
|
|
|
CandidatesMutex.Unlock()
|
|
|
|
|
} else if connectionState == pionWebRTC.PeerConnectionStateConnected {
|
|
|
|
|
CandidatesMutex.Lock()
|
|
|
|
|
atomic.AddInt64(&peerConnectionCount, 1)
|
|
|
|
|
CandidatesMutex.Unlock()
|
|
|
|
|
} else if connectionState == pionWebRTC.PeerConnectionStateFailed {
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICEConnectionStateFailed")
|
|
|
|
|
}
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Number of peers connected (" + strconv.FormatInt(peerConnectionCount, 10) + ")")
|
|
|
|
|
|
|
|
|
|
switch connectionState {
|
|
|
|
|
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed:
|
|
|
|
|
count := globalConnectionManager.DecrementPeerCount()
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer disconnected. Active peers: " + string(rune(count)))
|
|
|
|
|
|
|
|
|
|
// Clean up resources
|
|
|
|
|
globalConnectionManager.CloseCandidateChannel(sessionKey)
|
|
|
|
|
|
|
|
|
|
if err := peerConnection.Close(); err != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error closing peer connection: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
globalConnectionManager.RemovePeerConnection(handshake.SessionID)
|
|
|
|
|
close(wrapper.done)
|
|
|
|
|
|
|
|
|
|
case pionWebRTC.PeerConnectionStateConnected:
|
|
|
|
|
count := globalConnectionManager.IncrementPeerCount()
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + string(rune(count)))
|
|
|
|
|
|
|
|
|
|
case pionWebRTC.PeerConnectionStateFailed:
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection failed")
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
defer func() {
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Iterate over the candidates and send them to the remote client
|
|
|
|
|
// Non blocking channe
|
|
|
|
|
for candidate := range CandidateArrays[sessionKey] {
|
|
|
|
|
CandidatesMutex.Lock()
|
|
|
|
|
log.Log.Info(">>>> webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
|
|
|
|
|
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while adding candidate: " + candidateErr.Error())
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case candidate, ok := <-candidateChannel:
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
|
|
|
|
|
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: candidate}); candidateErr != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
CandidatesMutex.Unlock()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
@@ -270,22 +397,56 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
|
|
|
|
|
|
|
|
|
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
|
|
|
|
|
// The other peer will add this candidate by calling AddICECandidate
|
|
|
|
|
var hasRelayCandidates bool
|
|
|
|
|
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
|
|
|
|
|
|
|
|
|
|
if candidate == nil {
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE gathering complete (candidate is nil)")
|
|
|
|
|
if !hasRelayCandidates {
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): WARNING - No TURN (relay) candidates were gathered! TURN servers: " +
|
|
|
|
|
config.TURNURI + ", Username: " + config.TURNUsername + ", ForceTurn: " + config.ForceTurn)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Log candidate details for debugging
|
|
|
|
|
candidateJSON := candidate.ToJSON()
|
|
|
|
|
candidateStr := candidateJSON.Candidate
|
|
|
|
|
|
|
|
|
|
// Determine candidate type from the candidate string
|
|
|
|
|
candidateType := "unknown"
|
|
|
|
|
if candidateJSON.Candidate != "" {
|
|
|
|
|
switch candidate.Typ {
|
|
|
|
|
case pionWebRTC.ICECandidateTypeRelay:
|
|
|
|
|
candidateType = "relay"
|
|
|
|
|
case pionWebRTC.ICECandidateTypeSrflx:
|
|
|
|
|
candidateType = "srflx"
|
|
|
|
|
case pionWebRTC.ICECandidateTypeHost:
|
|
|
|
|
candidateType = "host"
|
|
|
|
|
case pionWebRTC.ICECandidateTypePrflx:
|
|
|
|
|
candidateType = "prflx"
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Track if we received any relay (TURN) candidates
|
|
|
|
|
if candidateType == "relay" {
|
|
|
|
|
hasRelayCandidates = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE candidate received - Type: " + candidateType +
|
|
|
|
|
", Candidate: " + candidateStr)
|
|
|
|
|
|
|
|
|
|
// Create a config map
|
|
|
|
|
valueMap := make(map[string]interface{})
|
|
|
|
|
candateJSON := candidate.ToJSON()
|
|
|
|
|
candateBinary, err := json.Marshal(candateJSON)
|
|
|
|
|
candateBinary, err := json.Marshal(candidateJSON)
|
|
|
|
|
if err == nil {
|
|
|
|
|
valueMap["candidate"] = string(candateBinary)
|
|
|
|
|
// SDP is not needed to be send..
|
|
|
|
|
//valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
|
|
|
|
|
valueMap["session_id"] = handshake.SessionID
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
|
|
|
|
|
} else {
|
|
|
|
|
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): something went wrong while marshalling candidate: " + err.Error())
|
|
|
|
|
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to marshal candidate: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We'll send the candidate to the hub
|
|
|
|
|
@@ -305,8 +466,8 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Create a channel which will be used to send candidates to the other peer
|
|
|
|
|
peerConnections[handshake.SessionID] = peerConnection
|
|
|
|
|
// Store peer connection in manager
|
|
|
|
|
globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper)
|
|
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
// Create a config map
|
|
|
|
|
@@ -339,7 +500,11 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
|
|
|
|
|
|
|
|
|
func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
|
|
|
|
|
mimeType := pionWebRTC.MimeTypeH264
|
|
|
|
|
outboundVideoTrack, _ := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", "pion124")
|
|
|
|
|
outboundVideoTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", trackStreamID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.NewVideoTrack(): error creating video track: " + err.Error())
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return outboundVideoTrack
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -354,161 +519,245 @@ func NewAudioTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample
|
|
|
|
|
mimeType = pionWebRTC.MimeTypePCMA
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
outboundAudioTrack, _ := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", "pion124")
|
|
|
|
|
if mimeType == "" {
|
|
|
|
|
log.Log.Error("webrtc.main.NewAudioTrack(): no supported audio codec found")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
outboundAudioTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", trackStreamID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Log.Error("webrtc.main.NewAudioTrack(): error creating audio track: " + err.Error())
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return outboundAudioTrack
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// streamState holds state information for the streaming process
|
|
|
|
|
type streamState struct {
|
|
|
|
|
lastKeepAlive int64
|
|
|
|
|
peerCount int64
|
|
|
|
|
start bool
|
|
|
|
|
receivedKeyFrame bool
|
|
|
|
|
lastAudioSample *pionMedia.Sample
|
|
|
|
|
lastVideoSample *pionMedia.Sample
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// codecSupport tracks which codecs are available in the stream
|
|
|
|
|
type codecSupport struct {
|
|
|
|
|
hasH264 bool
|
|
|
|
|
hasPCM_MULAW bool
|
|
|
|
|
hasAAC bool
|
|
|
|
|
hasOpus bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// detectCodecs examines the stream to determine which codecs are available
|
|
|
|
|
func detectCodecs(rtspClient capture.RTSPClient) codecSupport {
|
|
|
|
|
support := codecSupport{}
|
|
|
|
|
streams, _ := rtspClient.GetStreams()
|
|
|
|
|
|
|
|
|
|
for _, stream := range streams {
|
|
|
|
|
switch stream.Name {
|
|
|
|
|
case "H264":
|
|
|
|
|
support.hasH264 = true
|
|
|
|
|
case "PCM_MULAW":
|
|
|
|
|
support.hasPCM_MULAW = true
|
|
|
|
|
case "AAC":
|
|
|
|
|
support.hasAAC = true
|
|
|
|
|
case "OPUS":
|
|
|
|
|
support.hasOpus = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return support
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// hasValidCodecs checks if at least one valid video or audio codec is present
|
|
|
|
|
func (cs codecSupport) hasValidCodecs() bool {
|
|
|
|
|
hasVideo := cs.hasH264
|
|
|
|
|
hasAudio := cs.hasPCM_MULAW || cs.hasAAC || cs.hasOpus
|
|
|
|
|
return hasVideo || hasAudio
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// shouldContinueStreaming determines if streaming should continue based on keepalive and peer count
|
|
|
|
|
func shouldContinueStreaming(config models.Config, state *streamState) bool {
|
|
|
|
|
if config.Capture.ForwardWebRTC != "true" {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
now := time.Now().Unix()
|
|
|
|
|
hasTimedOut := (now - state.lastKeepAlive) > int64(keepAliveTimeout.Seconds())
|
|
|
|
|
hasNoPeers := state.peerCount == 0
|
|
|
|
|
|
|
|
|
|
return !hasTimedOut && !hasNoPeers
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// updateStreamState updates keepalive and peer count from communication channels
|
|
|
|
|
func updateStreamState(communication *models.Communication, state *streamState) {
|
|
|
|
|
select {
|
|
|
|
|
case keepAliveStr := <-communication.HandleLiveHDKeepalive:
|
|
|
|
|
if val, err := strconv.ParseInt(keepAliveStr, 10, 64); err == nil {
|
|
|
|
|
state.lastKeepAlive = val
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case peerCountStr := <-communication.HandleLiveHDPeers:
|
|
|
|
|
if val, err := strconv.ParseInt(peerCountStr, 10, 64); err == nil {
|
|
|
|
|
state.peerCount = val
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// writeFinalSamples writes any remaining buffered samples
|
|
|
|
|
func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.TrackLocalStaticSample) {
|
|
|
|
|
if state.lastVideoSample != nil && videoTrack != nil {
|
|
|
|
|
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
|
|
|
|
|
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final video sample: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if state.lastAudioSample != nil && audioTrack != nil {
|
|
|
|
|
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
|
|
|
|
|
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final audio sample: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// processVideoPacket processes a video packet and writes samples to the track
|
|
|
|
|
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
|
|
|
|
|
if videoTrack == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start at the first keyframe
|
|
|
|
|
if pkt.IsKeyFrame {
|
|
|
|
|
state.start = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !state.start {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
|
|
|
|
|
|
|
|
|
|
if config.Capture.ForwardWebRTC == "true" {
|
|
|
|
|
// Remote forwarding not yet implemented
|
|
|
|
|
log.Log.Debug("webrtc.main.processVideoPacket(): remote forwarding not implemented")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if state.lastVideoSample != nil {
|
|
|
|
|
duration := sample.PacketTimestamp - state.lastVideoSample.PacketTimestamp
|
|
|
|
|
state.lastVideoSample.Duration = time.Duration(duration) * time.Millisecond
|
|
|
|
|
|
|
|
|
|
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
|
|
|
|
|
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
state.lastVideoSample = &sample
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// processAudioPacket processes an audio packet and writes samples to the track
|
|
|
|
|
func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pionWebRTC.TrackLocalStaticSample, hasAAC bool) {
|
|
|
|
|
if audioTrack == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if hasAAC {
|
|
|
|
|
// AAC transcoding not yet implemented
|
|
|
|
|
// TODO: Implement AAC to PCM_MULAW transcoding
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
|
|
|
|
|
|
|
|
|
|
if state.lastAudioSample != nil {
|
|
|
|
|
duration := sample.PacketTimestamp - state.lastAudioSample.PacketTimestamp
|
|
|
|
|
state.lastAudioSample.Duration = time.Duration(duration) * time.Millisecond
|
|
|
|
|
|
|
|
|
|
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
|
|
|
|
|
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
state.lastAudioSample = &sample
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
|
|
|
|
|
|
|
|
|
|
config := configuration.Config
|
|
|
|
|
|
|
|
|
|
// Make peerconnection map
|
|
|
|
|
peerConnections = make(map[string]*pionWebRTC.PeerConnection)
|
|
|
|
|
|
|
|
|
|
// Set the indexes for the video & audio streams
|
|
|
|
|
// Later when we read a packet we need to figure out which track to send it to.
|
|
|
|
|
hasH264 := false
|
|
|
|
|
hasPCM_MULAW := false
|
|
|
|
|
hasAAC := false
|
|
|
|
|
hasOpus := false
|
|
|
|
|
streams, _ := rtspClient.GetStreams()
|
|
|
|
|
for _, stream := range streams {
|
|
|
|
|
if stream.Name == "H264" {
|
|
|
|
|
hasH264 = true
|
|
|
|
|
} else if stream.Name == "PCM_MULAW" {
|
|
|
|
|
hasPCM_MULAW = true
|
|
|
|
|
} else if stream.Name == "AAC" {
|
|
|
|
|
hasAAC = true
|
|
|
|
|
} else if stream.Name == "OPUS" {
|
|
|
|
|
hasOpus = true
|
|
|
|
|
}
|
|
|
|
|
// Check if at least one track is available
|
|
|
|
|
if videoTrack == nil && audioTrack == nil {
|
|
|
|
|
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio tracks are nil, cannot proceed")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !hasH264 && !hasPCM_MULAW && !hasAAC && !hasOpus {
|
|
|
|
|
log.Log.Error("webrtc.main.WriteToTrack(): no valid video codec and audio codec found.")
|
|
|
|
|
} else {
|
|
|
|
|
if config.Capture.TranscodingWebRTC == "true" {
|
|
|
|
|
// Todo..
|
|
|
|
|
} else {
|
|
|
|
|
//log.Log.Info("webrtc.main.WriteToTrack(): not using a transcoder.")
|
|
|
|
|
}
|
|
|
|
|
// Detect available codecs
|
|
|
|
|
codecs := detectCodecs(rtspClient)
|
|
|
|
|
|
|
|
|
|
var cursorError error
|
|
|
|
|
var pkt packets.Packet
|
|
|
|
|
var lastAudioSample *pionMedia.Sample = nil
|
|
|
|
|
var lastVideoSample *pionMedia.Sample = nil
|
|
|
|
|
|
|
|
|
|
start := false
|
|
|
|
|
receivedKeyFrame := false
|
|
|
|
|
lastKeepAlive := "0"
|
|
|
|
|
peerCount := "0"
|
|
|
|
|
|
|
|
|
|
for cursorError == nil {
|
|
|
|
|
|
|
|
|
|
pkt, cursorError = livestreamCursor.ReadPacket()
|
|
|
|
|
|
|
|
|
|
//if config.Capture.ForwardWebRTC != "true" && peerConnectionCount == 0 {
|
|
|
|
|
// start = false
|
|
|
|
|
// receivedKeyFrame = false
|
|
|
|
|
// continue
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case lastKeepAlive = <-communication.HandleLiveHDKeepalive:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case peerCount = <-communication.HandleLiveHDPeers:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
now := time.Now().Unix()
|
|
|
|
|
lastKeepAliveN, _ := strconv.ParseInt(lastKeepAlive, 10, 64)
|
|
|
|
|
hasTimedOut := (now - lastKeepAliveN) > 15 // if longer then no response in 15 sec.
|
|
|
|
|
hasNoPeers := peerCount == "0"
|
|
|
|
|
|
|
|
|
|
if config.Capture.ForwardWebRTC == "true" && (hasTimedOut || hasNoPeers) {
|
|
|
|
|
start = false
|
|
|
|
|
receivedKeyFrame = false
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(pkt.Data) == 0 || pkt.Data == nil {
|
|
|
|
|
receivedKeyFrame = false
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !receivedKeyFrame {
|
|
|
|
|
if pkt.IsKeyFrame {
|
|
|
|
|
receivedKeyFrame = true
|
|
|
|
|
} else {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//if config.Capture.TranscodingWebRTC == "true" {
|
|
|
|
|
// We will transcode the video
|
|
|
|
|
// TODO..
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
if pkt.IsVideo {
|
|
|
|
|
|
|
|
|
|
// Start at the first keyframe
|
|
|
|
|
if pkt.IsKeyFrame {
|
|
|
|
|
start = true
|
|
|
|
|
}
|
|
|
|
|
if start {
|
|
|
|
|
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
|
|
|
|
|
//sample = pionMedia.Sample{Data: pkt.Data, Duration: time.Second}
|
|
|
|
|
if config.Capture.ForwardWebRTC == "true" {
|
|
|
|
|
// We will send the video to a remote peer
|
|
|
|
|
// TODO..
|
|
|
|
|
} else {
|
|
|
|
|
if lastVideoSample != nil {
|
|
|
|
|
duration := sample.PacketTimestamp - lastVideoSample.PacketTimestamp
|
|
|
|
|
bufferDurationCasted := time.Duration(duration) * time.Millisecond
|
|
|
|
|
lastVideoSample.Duration = bufferDurationCasted
|
|
|
|
|
if err := videoTrack.WriteSample(*lastVideoSample); err != nil && err != io.ErrClosedPipe {
|
|
|
|
|
log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lastVideoSample = &sample
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if pkt.IsAudio {
|
|
|
|
|
|
|
|
|
|
// @TODO: We need to check if the audio is PCM_MULAW or AAC
|
|
|
|
|
// If AAC we need to transcode it to PCM_MULAW
|
|
|
|
|
// If PCM_MULAW we can send it directly.
|
|
|
|
|
|
|
|
|
|
if hasAAC {
|
|
|
|
|
// We will transcode the audio from AAC to PCM_MULAW
|
|
|
|
|
// Not sure how to do this yet, but we need to use a decoder
|
|
|
|
|
// and then encode it to PCM_MULAW.
|
|
|
|
|
// TODO..
|
|
|
|
|
//d := fdkaac.NewAacDecoder()
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We will send the audio
|
|
|
|
|
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
|
|
|
|
|
|
|
|
|
|
if lastAudioSample != nil {
|
|
|
|
|
duration := sample.PacketTimestamp - lastAudioSample.PacketTimestamp
|
|
|
|
|
bufferDurationCasted := time.Duration(duration) * time.Millisecond
|
|
|
|
|
lastAudioSample.Duration = bufferDurationCasted
|
|
|
|
|
if err := audioTrack.WriteSample(*lastAudioSample); err != nil && err != io.ErrClosedPipe {
|
|
|
|
|
log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lastAudioSample = &sample
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !codecs.hasValidCodecs() {
|
|
|
|
|
log.Log.Error("webrtc.main.WriteToTrack(): no valid video or audio codec found")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
peerConnectionCount = 0
|
|
|
|
|
log.Log.Info("webrtc.main.WriteToTrack(): stop writing to track.")
|
|
|
|
|
if config.Capture.TranscodingWebRTC == "true" {
|
|
|
|
|
log.Log.Info("webrtc.main.WriteToTrack(): transcoding enabled but not yet implemented")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initialize streaming state
|
|
|
|
|
state := &streamState{
|
|
|
|
|
lastKeepAlive: time.Now().Unix(),
|
|
|
|
|
peerCount: 0,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
writeFinalSamples(state, videoTrack, audioTrack)
|
|
|
|
|
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
var pkt packets.Packet
|
|
|
|
|
var cursorError error
|
|
|
|
|
|
|
|
|
|
for cursorError == nil {
|
|
|
|
|
pkt, cursorError = livestreamCursor.ReadPacket()
|
|
|
|
|
|
|
|
|
|
if cursorError != nil {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update state from communication channels
|
|
|
|
|
updateStreamState(communication, state)
|
|
|
|
|
|
|
|
|
|
// Check if we should continue streaming
|
|
|
|
|
if !shouldContinueStreaming(config, state) {
|
|
|
|
|
state.start = false
|
|
|
|
|
state.receivedKeyFrame = false
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Skip empty packets
|
|
|
|
|
if len(pkt.Data) == 0 || pkt.Data == nil {
|
|
|
|
|
state.receivedKeyFrame = false
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for first keyframe before processing
|
|
|
|
|
if !state.receivedKeyFrame {
|
|
|
|
|
if pkt.IsKeyFrame {
|
|
|
|
|
state.receivedKeyFrame = true
|
|
|
|
|
} else {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Process video or audio packets
|
|
|
|
|
if pkt.IsVideo {
|
|
|
|
|
processVideoPacket(pkt, state, videoTrack, config)
|
|
|
|
|
} else if pkt.IsAudio {
|
|
|
|
|
processAudioPacket(pkt, state, audioTrack, codecs.hasAAC)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|