Adding Kafka telemetry

This commit is contained in:
stephb9959
2022-01-09 16:14:11 -08:00
parent 467261b9ca
commit 21bdeab870
7 changed files with 181 additions and 58 deletions

View File

@@ -25,6 +25,7 @@
#include "framework/RESTAPI_errors.h"
#include "framework/ConfigurationValidator.h"
#include "rttys/RTTYS_server.h"
#include "WebSocketServer.h"
namespace OpenWifi {
@@ -824,62 +825,65 @@ void RESTAPI_device_commandHandler::MakeRequest() {
void RESTAPI_device_commandHandler::Telemetry(){
Logger_.information(Poco::format("TELEMETRY: user=%s serial=%s", UserInfo_.userinfo.email,SerialNumber_));
auto Obj = ParseStream();
auto Obj = ParseStream();
if (Obj->has(RESTAPI::Protocol::SERIALNUMBER) &&
Obj->has(RESTAPI::Protocol::INTERVAL) && Obj->has(RESTAPI::Protocol::TYPES)) {
Obj->has(RESTAPI::Protocol::INTERVAL) &&
Obj->has(RESTAPI::Protocol::TYPES)) {
auto SNum = Obj->get(RESTAPI::Protocol::SERIALNUMBER).toString();
if (SerialNumber_ != SNum) {
return BadRequest(RESTAPI::Errors::SerialNumberMismatch);
}
GWObjects::Device Device;
if (!StorageService()->GetDevice(SerialNumber_, Device)) {
return NotFound();
}
if (!DeviceRegistry()->Connected(SerialNumber_)) {
return BadRequest(RESTAPI::Errors::DeviceNotConnected);
}
auto Interval = Obj->get(RESTAPI::Protocol::INTERVAL);
std::string UUID;
if (Obj->has(RESTAPI::Protocol::UUID))
UUID = Obj->get(RESTAPI::Protocol::UUID).toString();
uint64_t Lifetime = 60 * 60 ; // 1 hour
uint64_t Interval = 5;
bool KafkaOnly = false;
GWObjects::CommandDetails Cmd;
Cmd.SerialNumber = SerialNumber_;
Cmd.SubmittedBy = UserInfo_.webtoken.username_;
Cmd.Command = uCentralProtocol::TELEMETRY;
Poco::JSON::Object Params;
Params.set(RESTAPI::Protocol::SERIALNUMBER, SerialNumber_);
Params.set(RESTAPI::Protocol::INTERVAL, Interval);
if (Interval > 0)
Params.set(RESTAPI::Protocol::TYPES, Obj->getArray(RESTAPI::Protocol::TYPES));
std::string Endpoint, NewUUID;
Poco::JSON::Object Answer;
if (Interval) {
if (TelemetryStream()->CreateEndpoint(SerialNumber_, Endpoint, NewUUID)) {
Answer.set("serialNumber", SerialNumber_);
Answer.set("uuid", NewUUID);
Answer.set("uri", Endpoint);
}
} else {
return BadRequest(RESTAPI::Errors::CannotCreateWS);
if(Obj->has("kafkaOnly")) {
KafkaOnly = Obj->get("kafkaOnly").toString()=="true";
}
Cmd.UUID = NewUUID;
std::stringstream ParamStream;
Params.stringify(ParamStream);
Cmd.Details = ParamStream.str();
AssignIfPresent(Obj, RESTAPI::Protocol::INTERVAL, Interval);
AssignIfPresent(Obj, RESTAPI::Protocol::LIFETIME, Lifetime);
return RESTAPI_RPC::WaitForCommand(Cmd, Params, *Request, *Response,
60000ms, &Answer, this, Logger_);
auto DeviceConnection = DeviceRegistry()->GetDeviceConnection(SerialNumber_);
if(DeviceConnection->WSConn_== nullptr) {
return BadRequest(RESTAPI::Errors::DeviceNotConnected);
}
Poco::JSON::Object Answer;
if(KafkaOnly) {
if (Interval) {
DeviceConnection->WSConn_->SetKafkaTelemetryReporting(Interval, Lifetime);
Answer.set("action", "Kafka telemetry started.");
} else {
DeviceConnection->WSConn_->StopKafkaTelemetry();
Answer.set("action", "Kafka telemetry stopped.");
}
return ReturnObject(Answer);
} else {
DeviceConnection->WSConn_->SetWebSocketTelemetryReporting(Interval, Lifetime);
if (Interval) {
std::string EndPoint;
auto NewUUID = MicroService::instance().CreateUUID();
if (TelemetryStream()->CreateEndpoint(SerialNumber_, EndPoint, NewUUID)) {
Answer.set("serialNumber", SerialNumber_);
Answer.set("uuid", NewUUID);
Answer.set("uri", EndPoint);
return ReturnObject(Answer);
}
return BadRequest(RESTAPI::Errors::InternalError);
}
Answer.set("action", "WebSocket telemetry stopped.");
DeviceConnection->WSConn_->StopWebSocketTelemetry();
return ReturnObject(Answer);
}
}
BadRequest(RESTAPI::Errors::MissingOrInvalidParameters);
return BadRequest(RESTAPI::Errors::MissingOrInvalidParameters);
}
}