Adding Kafka telemetry

This commit is contained in:
stephb9959
2022-01-09 20:50:01 -08:00
parent 21bdeab870
commit 00e1d15203
3 changed files with 42 additions and 11 deletions

View File

@@ -857,6 +857,15 @@ void RESTAPI_device_commandHandler::MakeRequest() {
}
Poco::JSON::Object Answer;
bool TelemetryRunning;
uint64_t TelemetryWebSocketCount, TelemetryKafkaCount, TelemetryInterval, TelemetryWebSocketTimer, TelemetryKafkaTimer;
DeviceConnection->WSConn_->GetTelemetryParameters(TelemetryRunning,
TelemetryInterval,
TelemetryWebSocketTimer,
TelemetryKafkaTimer,
TelemetryWebSocketCount,
TelemetryKafkaCount);
if(KafkaOnly) {
if (Interval) {
DeviceConnection->WSConn_->SetKafkaTelemetryReporting(Interval, Lifetime);
@@ -865,24 +874,44 @@ void RESTAPI_device_commandHandler::MakeRequest() {
DeviceConnection->WSConn_->StopKafkaTelemetry();
Answer.set("action", "Kafka telemetry stopped.");
}
return ReturnObject(Answer);
} else {
DeviceConnection->WSConn_->SetWebSocketTelemetryReporting(Interval, Lifetime);
if (Interval) {
std::string EndPoint;
auto NewUUID = MicroService::instance().CreateUUID();
if (TelemetryStream()->CreateEndpoint(SerialNumber_, EndPoint, NewUUID)) {
Answer.set("action", "WebSocket telemetry started.");
Answer.set("serialNumber", SerialNumber_);
Answer.set("uuid", NewUUID);
Answer.set("uri", EndPoint);
return ReturnObject(Answer);
} else {
return BadRequest(RESTAPI::Errors::InternalError);
}
return BadRequest(RESTAPI::Errors::InternalError);
}
Answer.set("action", "WebSocket telemetry stopped.");
DeviceConnection->WSConn_->StopWebSocketTelemetry();
return ReturnObject(Answer);
}
DeviceConnection->WSConn_->GetTelemetryParameters(TelemetryRunning,
TelemetryInterval,
TelemetryWebSocketTimer,
TelemetryKafkaTimer,
TelemetryWebSocketCount,
TelemetryKafkaCount);
Poco::JSON::Object TelemetryStatus;
TelemetryStatus.set("running", TelemetryRunning);
TelemetryStatus.set("interval", TelemetryInterval);
TelemetryStatus.set("websocketTimer", TelemetryWebSocketTimer);
TelemetryStatus.set("kafkaTimer", TelemetryKafkaTimer);
TelemetryStatus.set("websocketClients", TelemetryWebSocketCount);
TelemetryStatus.set("kafkaClients", TelemetryKafkaCount);
Answer.set("status", TelemetryStatus);
std::ostringstream ooss;
Answer.stringify(ooss);
std::cout << "Telemetry status: " << ooss.str() << std::endl;
return ReturnObject(Answer);
}
return BadRequest(RESTAPI::Errors::MissingOrInvalidParameters);
}