mirror of
https://github.com/kerberos-io/agent.git
synced 2026-03-06 17:51:06 +00:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ffa97598b8 | ||
|
|
f5afbf3a63 | ||
|
|
e666695c96 | ||
|
|
55816e4b7b | ||
|
|
016fb51951 | ||
|
|
550a444650 | ||
|
|
4332e43f27 | ||
|
|
fdc3bfb4a4 | ||
|
|
c17d6b7117 | ||
|
|
5d7a8103c0 |
@@ -26,6 +26,7 @@
|
||||
"recording": "true",
|
||||
"snapshots": "true",
|
||||
"liveview": "true",
|
||||
"liveview_chunking": "false",
|
||||
"motion": "true",
|
||||
"postrecording": 20,
|
||||
"prerecording": 10,
|
||||
|
||||
@@ -672,6 +672,7 @@ func HandleLiveStreamSD(livestreamCursor *packets.QueueCursor, configuration *mo
|
||||
// Check if we need to enable the live stream
|
||||
if config.Capture.Liveview != "false" {
|
||||
|
||||
deviceId := config.Key
|
||||
hubKey := ""
|
||||
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
|
||||
hubKey = config.S3.Publickey
|
||||
@@ -706,39 +707,62 @@ func HandleLiveStreamSD(livestreamCursor *packets.QueueCursor, configuration *mo
|
||||
img, err := rtspClient.DecodePacket(pkt)
|
||||
if err == nil {
|
||||
bytes, _ := utils.ImageToBytes(&img)
|
||||
//encoded := base64.StdEncoding.EncodeToString(bytes)
|
||||
|
||||
// Split encoded image into chunks of 1kb
|
||||
// This is to prevent the MQTT message to be too large.
|
||||
// By default, bytes are not encoded to base64 here; you are splitting the raw JPEG/PNG bytes.
|
||||
// However, in MQTT and web contexts, binary data may not be handled well, so base64 is often used.
|
||||
// To avoid base64 encoding, just send the raw []byte chunks as you do here.
|
||||
// If you want to avoid base64, make sure the receiver can handle binary payloads.
|
||||
chunking := config.Capture.LiveviewChunking
|
||||
|
||||
chunkSize := 1 * 1024 // 1KB chunks
|
||||
var chunks [][]byte
|
||||
for i := 0; i < len(bytes); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(bytes) {
|
||||
end = len(bytes)
|
||||
if chunking == "true" {
|
||||
|
||||
// Split encoded image into chunks of 2kb
|
||||
// This is to prevent the MQTT message to be too large.
|
||||
// By default, bytes are not encoded to base64 here; you are splitting the raw JPEG/PNG bytes.
|
||||
// However, in MQTT and web contexts, binary data may not be handled well, so base64 is often used.
|
||||
// To avoid base64 encoding, just send the raw []byte chunks as you do here.
|
||||
// If you want to avoid base64, make sure the receiver can handle binary payloads.
|
||||
|
||||
chunkSize := 25 * 1024 // 25KB chunks
|
||||
var chunks [][]byte
|
||||
for i := 0; i < len(bytes); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(bytes) {
|
||||
end = len(bytes)
|
||||
}
|
||||
chunk := bytes[i:end]
|
||||
chunks = append(chunks, chunk)
|
||||
}
|
||||
chunk := bytes[i:end]
|
||||
chunks = append(chunks, chunk)
|
||||
}
|
||||
|
||||
log.Log.Infof("cloud.HandleLiveStreamSD(): Sending %d chunks of size %d bytes.", len(chunks), chunkSize)
|
||||
log.Log.Infof("cloud.HandleLiveStreamSD(): Sending %d chunks of size %d bytes.", len(chunks), chunkSize)
|
||||
|
||||
timestamp := time.Now().Unix()
|
||||
for i, chunk := range chunks {
|
||||
valueMap := make(map[string]interface{})
|
||||
valueMap["id"] = timestamp
|
||||
valueMap["chunk"] = chunk
|
||||
valueMap["chunkIndex"] = i
|
||||
valueMap["chunkSize"] = chunkSize
|
||||
valueMap["chunkCount"] = len(chunks)
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Version: "v1.0.0",
|
||||
Action: "receive-sd-stream",
|
||||
DeviceId: deviceId,
|
||||
Value: valueMap,
|
||||
},
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
mqttClient.Publish("kerberos/hub/"+hubKey+"/"+deviceId, 1, false, payload)
|
||||
log.Log.Infof("cloud.HandleLiveStreamSD(): sent chunk %d/%d to MQTT topic kerberos/hub/%s/%s", i+1, len(chunks), hubKey, deviceId)
|
||||
time.Sleep(33 * time.Millisecond) // Sleep to avoid flooding the MQTT broker with messages
|
||||
} else {
|
||||
log.Log.Info("cloud.HandleLiveStreamSD(): something went wrong while sending acknowledge config to hub: " + string(payload))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
timestamp := time.Now().Unix()
|
||||
for i, chunk := range chunks {
|
||||
valueMap := make(map[string]interface{})
|
||||
valueMap["id"] = timestamp
|
||||
valueMap["chunk"] = chunk
|
||||
valueMap["chunkIndex"] = i
|
||||
valueMap["chunkSize"] = chunkSize
|
||||
valueMap["chunkCount"] = len(chunks)
|
||||
valueMap["image"] = bytes
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Version: "v1.0.0",
|
||||
Action: "receive-sd-stream",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: valueMap,
|
||||
@@ -747,12 +771,13 @@ func HandleLiveStreamSD(livestreamCursor *packets.QueueCursor, configuration *mo
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
|
||||
log.Log.Infof("cloud.HandleLiveStreamSD(): sent chunk %d/%d to MQTT topic kerberos/hub/%s", i+1, len(chunks), hubKey)
|
||||
} else {
|
||||
log.Log.Info("cloud.HandleLiveStreamSD(): something went wrong while sending acknowledge config to hub: " + string(payload))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
time.Sleep(1000 * time.Millisecond) // Sleep to avoid flooding the MQTT broker with messages
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
@@ -392,6 +392,11 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
|
||||
configuration.Config.MQTTPassword = value
|
||||
break
|
||||
|
||||
/* MQTT chunking of low-resolution images into multiple messages */
|
||||
case "AGENT_CAPTURE_LIVEVIEW_CHUNKING":
|
||||
configuration.Config.Capture.LiveviewChunking = value
|
||||
break
|
||||
|
||||
/* Real-time streaming of keyframes to a MQTT topic */
|
||||
case "AGENT_REALTIME_PROCESSING":
|
||||
configuration.Config.RealtimeProcessing = value
|
||||
|
||||
@@ -62,6 +62,7 @@ type Capture struct {
|
||||
Snapshots string `json:"snapshots,omitempty"`
|
||||
Motion string `json:"motion,omitempty"`
|
||||
Liveview string `json:"liveview,omitempty"`
|
||||
LiveviewChunking string `json:"liveview_chunking,omitempty" bson:"liveview_chunking,omitempty"`
|
||||
Continuous string `json:"continuous,omitempty"`
|
||||
PostRecording int64 `json:"postrecording"`
|
||||
PreRecording int64 `json:"prerecording"`
|
||||
|
||||
@@ -281,7 +281,8 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
candateBinary, err := json.Marshal(candateJSON)
|
||||
if err == nil {
|
||||
valueMap["candidate"] = string(candateBinary)
|
||||
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
|
||||
// SDP is not needed to be send..
|
||||
//valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
|
||||
valueMap["session_id"] = handshake.SessionID
|
||||
} else {
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): something went wrong while marshalling candidate: " + err.Error())
|
||||
|
||||
Reference in New Issue
Block a user