Files
wlan-cloud-ucentralgw/src/AP_WS_Process_telemetry.cpp
2023-12-14 07:23:48 -08:00

67 lines
2.2 KiB
C++

//
// Created by stephane bourque on 2022-07-26.
//
#include "AP_WS_Connection.h"
#include "CommandManager.h"
#include "TelemetryStream.h"
#include "fmt/format.h"
#include "framework/KafkaManager.h"
#include "framework/utils.h"
namespace OpenWifi {
void AP_WS_Connection::Process_telemetry(Poco::JSON::Object::Ptr ParamsObj) {
if (!State_.Connected) {
poco_warning(Logger_,
fmt::format("INVALID-PROTOCOL({}): Device '{}' is not following protocol",
CId_, CN_));
Errors_++;
return;
}
poco_trace(Logger_, fmt::format("Telemetry data received for {}", SerialNumber_));
if (TelemetryReporting_ || ParamsObj->has("adhoc")) {
if (ParamsObj->has("data")) {
auto Payload = ParamsObj->get("data").extract<Poco::JSON::Object::Ptr>();
Payload->set("timestamp", Utils::Now());
std::ostringstream SS;
Payload->stringify(SS);
auto now = Utils::Now();
auto KafkaPayload = SS.str();
if (ParamsObj->has("adhoc")) {
KafkaManager()->PostMessage(KafkaTopics::DEVICE_TELEMETRY, SerialNumber_,
KafkaPayload);
return;
}
if (TelemetryWebSocketRefCount_) {
if (now < TelemetryWebSocketTimer_) {
TelemetryWebSocketPackets_++;
State_.websocketPackets = TelemetryWebSocketPackets_;
TelemetryStream()->NotifyEndPoint(SerialNumberInt_, KafkaPayload);
} else {
StopWebSocketTelemetry(CommandManager()->Next_RPC_ID());
}
}
if (TelemetryKafkaRefCount_) {
if (KafkaManager()->Enabled() && now < TelemetryKafkaTimer_) {
TelemetryKafkaPackets_++;
State_.kafkaPackets = TelemetryKafkaPackets_;
KafkaManager()->PostMessage(KafkaTopics::DEVICE_TELEMETRY, SerialNumber_,
KafkaPayload);
} else {
StopKafkaTelemetry(CommandManager()->Next_RPC_ID());
}
}
} else {
poco_debug(Logger_,
fmt::format("TELEMETRY({}): Invalid telemetry packet.", SerialNumber_));
}
} else {
// if we are ignoring telemetry, then close it down on the device.
poco_debug(Logger_,
fmt::format("TELEMETRY({}): Stopping runaway telemetry.", SerialNumber_));
StopTelemetry(CommandManager()->Next_RPC_ID());
}
}
} // namespace OpenWifi