resolve conflicts

This commit is contained in:
stephb9959
2021-09-27 22:12:12 +00:00
parent 0fa0af3159
commit eb361cad14
15 changed files with 1203 additions and 1423 deletions

View File

@@ -24,12 +24,15 @@
#include "Utils.h"
#include "WebSocketServer.h"
#include "uCentralProtocol.h"
#include "TelemetryStream.h"
#include "ConfigurationCache.h"
namespace OpenWifi {
class WebSocketServer *WebSocketServer::instance_ = nullptr;
WebSocketServer::WebSocketServer() noexcept: SubSystemServer("WebSocketServer", "WS-SVR", "ucentral.websocket")
WebSocketServer::WebSocketServer() noexcept:
SubSystemServer("WebSocketServer", "WS-SVR", "ucentral.websocket")
{
}
@@ -47,7 +50,7 @@ namespace OpenWifi {
}
int WebSocketServer::Start() {
ReactorPool_.Start();
for(const auto & Svr : ConfigServersList_ ) {
Logger_.notice(Poco::format("Starting: %s:%s Keyfile:%s CertFile: %s", Svr.Address(), std::to_string(Svr.Port()),
Svr.KeyFile(),Svr.CertFile()));
@@ -62,7 +65,7 @@ namespace OpenWifi {
IssuerCert_ = std::make_unique<Poco::Crypto::X509Certificate>(Svr.IssuerCertFile());
Logger_.information(Poco::format("Certificate Issuer Name:%s",IssuerCert_->issuerName()));
}
auto NewSocketAcceptor = std::make_unique<Poco::Net::ParallelSocketAcceptor<WSConnection, Poco::Net::SocketReactor>>( Sock, Reactor_);
auto NewSocketAcceptor = std::make_unique<Poco::Net::ParallelSocketAcceptor<WSConnection, Poco::Net::SocketReactor>>(Sock, Reactor_);
Acceptors_.push_back(std::move(NewSocketAcceptor));
}
ReactorThread_.start(Reactor_);
@@ -71,7 +74,7 @@ namespace OpenWifi {
void WebSocketServer::Stop() {
Logger_.notice("Stopping reactors...");
ReactorPool_.Stop();
Reactor_.stop();
ReactorThread_.join();
}
@@ -81,9 +84,9 @@ namespace OpenWifi {
}
void WSConnection::CompleteStartup() {
std::lock_guard Guard(WSMutex_);
std::lock_guard Guard(Mutex_);
try {
auto SS = dynamic_cast<Poco::Net::SecureStreamSocketImpl *>(Socket_.impl());
SS->completeHandshake();
@@ -115,9 +118,7 @@ namespace OpenWifi {
} else {
Logger_.error(Poco::format("%s: No certificates available..", CId_));
}
auto Params =
Poco::AutoPtr<Poco::Net::HTTPServerParams>(new Poco::Net::HTTPServerParams);
auto Params = Poco::AutoPtr<Poco::Net::HTTPServerParams>(new Poco::Net::HTTPServerParams);
Poco::Net::HTTPServerSession Session(Socket_, Params);
Poco::Net::HTTPServerResponseImpl Response(Session);
Poco::Net::HTTPServerRequestImpl Request(Response, Session, Params);
@@ -125,11 +126,9 @@ namespace OpenWifi {
auto Now = time(nullptr);
Response.setDate(Now);
Response.setVersion(Request.getVersion());
Response.setKeepAlive(Params->getKeepAlive() && Request.getKeepAlive() &&
Session.canKeepAlive());
Response.setKeepAlive(Params->getKeepAlive() && Request.getKeepAlive() && Session.canKeepAlive());
WS_ = std::make_unique<Poco::Net::WebSocket>(Request, Response);
WS_->setMaxPayloadSize(BufSize);
auto TS = Poco::Timespan(240,0);
WS_->setReceiveTimeout(TS);
@@ -145,16 +144,18 @@ namespace OpenWifi {
Poco::NObserver<WSConnection, Poco::Net::ErrorNotification>(
*this, &WSConnection::OnSocketError));
Registered_ = true;
Logger_.information(Poco::format("CONNECTION(%s): completed.",CId_));
return;
} catch (const Poco::Exception &E ) {
Logger_.error("Exception caught during device connection. Device will have to retry.");
Logger_.log(E);
}
delete this;
}
WSConnection::WSConnection(Poco::Net::StreamSocket & socket, Poco::Net::SocketReactor & reactor):
Socket_(socket),
Reactor_(reactor),
Reactor_(WebSocketServer()->GetNextReactor()),
Logger_(WebSocketServer()->Logger())
{
std::thread T([this](){ this->CompleteStartup();});
@@ -196,53 +197,50 @@ namespace OpenWifi {
bool WSConnection::LookForUpgrade(uint64_t UUID) {
// A UUID of zero means ignore updates for that connection.
if(UUID==0)
return false;
std::string NewConfig;
uint64_t NewConfigUUID = 0 ;
uint64_t GoodConfig=ConfigurationCache().CurrentConfig(SerialNumber_);
if(GoodConfig && (GoodConfig==UUID || GoodConfig==Conn_->PendingUUID))
return false;
if (Storage()->ExistingConfiguration(SerialNumber_,UUID, NewConfig, NewConfigUUID)) {
GWObjects::Device D;
if(Storage()->GetDevice(SerialNumber_,D)) {
// Device is already using the latest configuration.
if(UUID == NewConfigUUID)
// This is the case where the cache is empty after a restart. So GoodConfig will 0. If the device already
// has the right UUID, we just return.
if(D.UUID == UUID ) {
ConfigurationCache().Add(SerialNumber_,UUID);
return false;
}
// if the new config is already pending,
if(NewConfigUUID == Conn_->PendingUUID)
return false;
Conn_->PendingUUID = NewConfigUUID;
Poco::JSON::Parser Parser( new Poco::JSON::ParseHandler);
auto ParsedConfig = Parser.parse(NewConfig).extract<Poco::JSON::Object::Ptr>();
ParsedConfig->set(uCentralProtocol::UUID,NewConfigUUID);
// create the command stub...
Conn_->PendingUUID = D.UUID;
GWObjects::CommandDetails Cmd;
Cmd.SerialNumber = SerialNumber_;
Cmd.UUID = Daemon()->CreateUUID();
Cmd.SubmittedBy = uCentralProtocol::SUBMITTED_BY_SYSTEM;
Cmd.Status = uCentralProtocol::PENDING;
Cmd.Command = uCentralProtocol::CONFIGURE;
Poco::JSON::Parser P;
auto ParsedConfig = P.parse(D.Configuration).extract<Poco::JSON::Object::Ptr>();
Poco::JSON::Object Params;
Params.set(uCentralProtocol::SERIAL, SerialNumber_);
Params.set(uCentralProtocol::UUID, NewConfigUUID);
Params.set(uCentralProtocol::UUID, D.UUID);
Params.set(uCentralProtocol::WHEN, 0);
Params.set(uCentralProtocol::CONFIG, ParsedConfig);
std::string Log = Poco::format("CFG-UPGRADE(%s):, Current ID: %Lu, newer configuration %Lu.", SerialNumber_, UUID, NewConfigUUID);
Storage()->AddLog(SerialNumber_, Conn_->UUID, Log);
Logger_.debug(Log);
std::ostringstream O;
Poco::JSON::Stringifier::stringify(Params, O);
Cmd.Details = O.str();
std::string Log = Poco::format("CFG-UPGRADE(%s): Current ID: %Lu, newer configuration %Lu.", CId_, UUID, D.UUID);
Logger_.debug(Log);
uint64_t RPC_Id;
CommandManager()->SendCommand(SerialNumber_ , Cmd.Command, Params, Cmd.UUID, RPC_Id);
Storage()->AddCommand(SerialNumber_, Cmd, Storage::COMMAND_EXECUTED);
return true;
}
}
return false;
}
@@ -349,7 +347,6 @@ namespace OpenWifi {
}
Conn_->VerifiedCertificate = CertValidation_;
std::string Compatible;
if (Daemon()->AutoProvisioning() && !Storage()->DeviceExists(SerialNumber_)) {
Storage()->CreateDefaultDevice(SerialNumber_, Capabilities, Firmware, Compatible_);
} else if (Storage()->DeviceExists(SerialNumber_)) {
@@ -392,11 +389,11 @@ namespace OpenWifi {
else
Logger_.debug(Poco::format("STATE(%s): UUID=%Lu Updating for CMD=%s.", CId_,
UUID, request_uuid));
Conn_->UUID = UUID;
Storage()->AddStatisticsData(Serial, UUID, State);
DeviceRegistry()->SetStatistics(Serial, State);
LookForUpgrade(UUID);
GWObjects::Statistics Stats{ .SerialNumber = SerialNumber_, .UUID = UUID, .Data = State};
Stats.Recorded = std::time(nullptr);
Storage()->AddStatisticsData(Stats);
if (!request_uuid.empty()) {
Storage()->SetCommandResult(request_uuid, State);
}
@@ -437,15 +434,17 @@ namespace OpenWifi {
CId_, UUID, request_uuid));
Conn_->UUID = UUID;
LookForUpgrade(UUID);
GWObjects::HealthCheck Check;
Check.Recorded = time(nullptr);
Check.SerialNumber = SerialNumber_;
Check.Recorded = std::time(nullptr);
Check.UUID = UUID;
Check.Data = CheckData;
Check.Sanity = Sanity;
Storage()->AddHealthCheckData(Serial, Check);
Storage()->AddHealthCheckData(Check);
if (!request_uuid.empty()) {
Storage()->SetCommandResult(request_uuid, CheckData);
@@ -455,6 +454,7 @@ namespace OpenWifi {
if(KafkaManager()->Enabled()) {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
ParamsObj->set("timestamp",std::time(nullptr));
Stringify.condense(ParamsObj,OS);
KafkaManager()->PostMessage(KafkaTopics::HEALTHCHECK, SerialNumber_, OS.str());
}
@@ -477,14 +477,14 @@ namespace OpenWifi {
DataStr = DataObj.toString();
}
GWObjects::DeviceLog DeviceLog{.Log = Log,
.Data = DataStr,
.Severity = Severity,
.Recorded = (uint64_t)time(nullptr),
.LogType = 0,
.UUID = Conn_->UUID};
Storage()->AddLog(Serial, DeviceLog);
GWObjects::DeviceLog DeviceLog{ .SerialNumber = SerialNumber_,
.Log = Log,
.Data = DataStr,
.Severity = Severity,
.Recorded = (uint64_t)time(nullptr),
.LogType = 0,
.UUID = Conn_->UUID};
Storage()->AddLog(DeviceLog);
} else {
Logger_.warning(Poco::format("LOG(%s): Missing parameters.", CId_));
return;
@@ -505,14 +505,15 @@ namespace OpenWifi {
}
GWObjects::DeviceLog DeviceLog{
.SerialNumber = SerialNumber_,
.Log = LogText,
.Data = "",
.Severity = GWObjects::DeviceLog::LOG_EMERG,
.Recorded = (uint64_t)time(nullptr),
.LogType = 1,
.UUID = Conn_->UUID};
Storage()->AddLog(DeviceLog);
Storage()->AddLog(Serial, DeviceLog, true);
} else {
Logger_.warning(Poco::format("LOG(%s): Missing parameters.", CId_));
return;
@@ -549,9 +550,6 @@ namespace OpenWifi {
ParamsObj->has(uCentralProtocol::UUID) && ParamsObj->has(uCentralProtocol::REBOOT) &&
ParamsObj->has(uCentralProtocol::LOGLINES)) {
uint64_t UUID = ParamsObj->get(uCentralProtocol::UUID);
uint64_t Reboot = ParamsObj->get(uCentralProtocol::REBOOT);
auto Firmware = ParamsObj->get(uCentralProtocol::FIRMWARE).toString();
auto LogLines = ParamsObj->get(uCentralProtocol::LOGLINES);
std::string LogText;
if (LogLines.isArray()) {
@@ -579,6 +577,14 @@ namespace OpenWifi {
}
break;
case uCentralProtocol::ET_TELEMETRY: {
if(ParamsObj->has("data")) {
auto Payload = ParamsObj->get("data").toString();
TelemetryStream()->UpdateEndPoint(SerialNumber_, Payload);
}
}
break;
// this will never be called but some compilers will complain if we do not have a case for
// every single values of an enum
case uCentralProtocol::ET_UNKNOWN: {
@@ -589,38 +595,37 @@ namespace OpenWifi {
}
void WSConnection::OnSocketShutdown(const Poco::AutoPtr<Poco::Net::ShutdownNotification>& pNf) {
std::lock_guard Guard(WSMutex_);
std::lock_guard Guard(Mutex_);
Logger_.information(Poco::format("SOCKET-SHUTDOWN(%s): Closing.",CId_));
std::cout << "Socket shutdown for " << SerialNumber_ << std::endl;
delete this;
}
void WSConnection::OnSocketError(const Poco::AutoPtr<Poco::Net::ErrorNotification>& pNf) {
std::lock_guard Guard(WSMutex_);
std::lock_guard Guard(Mutex_);
Logger_.information(Poco::format("SOCKET-ERROR(%s): Closing.",CId_));
std::cout << "Socket error for " << SerialNumber_ << std::endl;
delete this;
}
void WSConnection::OnSocketReadable(const Poco::AutoPtr<Poco::Net::ReadableNotification>& pNf) {
std::lock_guard Guard(WSMutex_);
std::lock_guard Guard(Mutex_);
try
{
ProcessIncomingFrame();
}
catch ( const Poco::Exception & E )
catch (const Poco::Exception & E)
{
Logger_.log(E);
delete this;
}
catch ( const std::exception & E) {
catch (const std::exception & E) {
std::string W = E.what();
Logger_.information(Poco::format("std::exception caught: %s. Connection terminated with %s",W,CId_));
delete this;
}
catch ( ... ) {
Logger_.information(Poco::format("Unknown exception for %s. Connection terminated.",CId_));
delete this;
}
}
std::string asString(Poco::Buffer<char> & buf ) {
@@ -632,20 +637,18 @@ namespace OpenWifi {
}
void WSConnection::ProcessIncomingFrame() {
int flags, Op;
int IncomingSize;
bool MustDisconnect=false;
Poco::Buffer<char> IncomingFrame(0);
try {
int Op,flags;
int IncomingSize;
IncomingSize = WS_->receiveFrame(IncomingFrame,flags);
Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
// std::cout << "ID:" << CId_ << " Size=" << IncomingSize << " Flags=" << flags << " Op=" << Op << std::endl;
if (IncomingSize == 0 && flags == 0 && Op == 0) {
Logger_.information(Poco::format("DISCONNECT(%s)", CId_));
Logger_.information(Poco::format("DISCONNECT(%s): device has disconnected.", CId_));
MustDisconnect = true;
} else {
switch (Op) {
@@ -701,8 +704,14 @@ namespace OpenWifi {
Logger_.error(Poco::format("FRAME(%s): illegal transaction header, missing 'jsonrpc'",CId_));
Errors_++;
}
break;
}
}
break;
case Poco::Net::WebSocket::FRAME_OP_CLOSE: {
Logger_.warning(Poco::format("CLOSE(%s): Device is closing its connection.",CId_));
MustDisconnect = true;
}
break;
default: {
Logger_.warning(Poco::format("UNKNOWN(%s): unknownWS Frame operation: %s",CId_, std::to_string(Op)));
@@ -775,19 +784,18 @@ namespace OpenWifi {
MustDisconnect = true ;
}
if(!MustDisconnect && Errors_<10) {
return;
}
if(!MustDisconnect && Errors_<10)
return;
if(Errors_>10) {
Logger_.information(Poco::format("DISCONNECTING(%s): Too many errors",CId_));
}
if(Errors_>10) {
Logger_.information(Poco::format("DISCONNECTING(%s): Too many errors",CId_));
}
delete this;
}
bool WSConnection::Send(const std::string &Payload) {
std::lock_guard Guard(WSMutex_);
std::lock_guard Guard(Mutex_);
auto BytesSent = WS_->sendFrame(Payload.c_str(),(int)Payload.size());
if(Conn_)