stephb9959
2023-02-21 12:19:16 -08:00
parent efb2623613
commit 383343a2e5
113 changed files with 13223 additions and 13071 deletions

View File

@@ -5,87 +5,95 @@
#include "NewCommandHandler.h"
#include "StorageService.h"
#include "framework/KafkaManager.h"
#include "fmt/format.h"
#include "framework/KafkaManager.h"
#include "nlohmann/json.hpp"
namespace OpenWifi {
void NewCommandHandler::run() {
Running_ = true ;
Utils::SetThreadName("cmd-handler");
while(Running_) {
Poco::Thread::trySleep(2000);
void NewCommandHandler::run() {
Running_ = true;
Utils::SetThreadName("cmd-handler");
while (Running_) {
Poco::Thread::trySleep(2000);
if(!Running_)
break;
if (!Running_)
break;
while(!NewCommands_.empty()) {
if(!Running_)
break;
while (!NewCommands_.empty()) {
if (!Running_)
break;
Types::StringPair S;
{
std::lock_guard G(Mutex_);
S = NewCommands_.front();
NewCommands_.pop();
}
Types::StringPair S;
{
std::lock_guard G(Mutex_);
S = NewCommands_.front();
NewCommands_.pop();
}
try {
auto SerialNumber = S.first;
auto M = nlohmann::json::parse(S.second);
try {
auto SerialNumber = S.first;
auto M = nlohmann::json::parse(S.second);
std::string EndPoint;
std::string EndPoint;
if(M.contains(uCentralProtocol::SYSTEM)) {
auto SystemObj = M[uCentralProtocol::SYSTEM];
if(SystemObj.contains(uCentralProtocol::HOST))
EndPoint = SystemObj[uCentralProtocol::HOST];
}
if (M.contains(uCentralProtocol::SYSTEM)) {
auto SystemObj = M[uCentralProtocol::SYSTEM];
if (SystemObj.contains(uCentralProtocol::HOST))
EndPoint = SystemObj[uCentralProtocol::HOST];
}
if(M.contains(uCentralProtocol::PAYLOAD)) {
auto PayloadSection = M[uCentralProtocol::PAYLOAD];
if(PayloadSection.contains("command")) {
auto Command = PayloadSection["command"];
if(Command=="delete_device") {
auto pSerialNumber = PayloadSection["payload"]["serialNumber"];
if(pSerialNumber==SerialNumber) {
poco_debug(Logger(),fmt::format("Removing device '{}' from upgrade history.",SerialNumber));
StorageService()->HistoryDB().DeleteHistory(SerialNumber);
poco_debug(Logger(),fmt::format("Removing device '{}' from device table.",SerialNumber));
StorageService()->DevicesDB().DeleteDevice(SerialNumber);
}
}
}
}
} catch (const Poco::Exception &E) {
Logger().log(E);
}
}
}
};
if (M.contains(uCentralProtocol::PAYLOAD)) {
auto PayloadSection = M[uCentralProtocol::PAYLOAD];
if (PayloadSection.contains("command")) {
auto Command = PayloadSection["command"];
if (Command == "delete_device") {
auto pSerialNumber = PayloadSection["payload"]["serialNumber"];
if (pSerialNumber == SerialNumber) {
poco_debug(
Logger(),
fmt::format("Removing device '{}' from upgrade history.",
SerialNumber));
StorageService()->HistoryDB().DeleteHistory(SerialNumber);
poco_debug(
Logger(),
fmt::format("Removing device '{}' from device table.",
SerialNumber));
StorageService()->DevicesDB().DeleteDevice(SerialNumber);
}
}
}
}
} catch (const Poco::Exception &E) {
Logger().log(E);
}
}
}
};
int NewCommandHandler::Start() {
Types::TopicNotifyFunction F = [this](std::string s1,std::string s2) { this->CommandReceived(s1,s2); };
WatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::COMMAND, F);
Worker_.start(*this);
return 0;
};
int NewCommandHandler::Start() {
Types::TopicNotifyFunction F = [this](std::string s1, std::string s2) {
this->CommandReceived(s1, s2);
};
WatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::COMMAND, F);
Worker_.start(*this);
return 0;
};
void NewCommandHandler::Stop() {
KafkaManager()->UnregisterTopicWatcher(KafkaTopics::COMMAND, WatcherId_);
Running_ = false;
Worker_.wakeUp();
Worker_.join();
};
void NewCommandHandler::Stop() {
KafkaManager()->UnregisterTopicWatcher(KafkaTopics::COMMAND, WatcherId_);
Running_ = false;
Worker_.wakeUp();
Worker_.join();
};
bool NewCommandHandler::Update() {
Worker_.wakeUp();
return true;
}
bool NewCommandHandler::Update() {
Worker_.wakeUp();
return true;
}
void NewCommandHandler::CommandReceived( const std::string & Key, const std::string & Message) {
std::lock_guard G(Mutex_);
NewCommands_.push(std::make_pair(Key,Message));
}
}
void NewCommandHandler::CommandReceived(const std::string &Key, const std::string &Message) {
std::lock_guard G(Mutex_);
NewCommands_.push(std::make_pair(Key, Message));
}
} // namespace OpenWifi