Files
wlan-cloud-ucentralfms/src/NewCommandHandler.cpp
2023-02-21 12:19:16 -08:00

99 lines
2.5 KiB
C++

//
// Created by stephane bourque on 2021-11-21.
//
#include "NewCommandHandler.h"
#include "StorageService.h"
#include "fmt/format.h"
#include "framework/KafkaManager.h"
#include "nlohmann/json.hpp"
namespace OpenWifi {
void NewCommandHandler::run() {
Running_ = true;
Utils::SetThreadName("cmd-handler");
while (Running_) {
Poco::Thread::trySleep(2000);
if (!Running_)
break;
while (!NewCommands_.empty()) {
if (!Running_)
break;
Types::StringPair S;
{
std::lock_guard G(Mutex_);
S = NewCommands_.front();
NewCommands_.pop();
}
try {
auto SerialNumber = S.first;
auto M = nlohmann::json::parse(S.second);
std::string EndPoint;
if (M.contains(uCentralProtocol::SYSTEM)) {
auto SystemObj = M[uCentralProtocol::SYSTEM];
if (SystemObj.contains(uCentralProtocol::HOST))
EndPoint = SystemObj[uCentralProtocol::HOST];
}
if (M.contains(uCentralProtocol::PAYLOAD)) {
auto PayloadSection = M[uCentralProtocol::PAYLOAD];
if (PayloadSection.contains("command")) {
auto Command = PayloadSection["command"];
if (Command == "delete_device") {
auto pSerialNumber = PayloadSection["payload"]["serialNumber"];
if (pSerialNumber == SerialNumber) {
poco_debug(
Logger(),
fmt::format("Removing device '{}' from upgrade history.",
SerialNumber));
StorageService()->HistoryDB().DeleteHistory(SerialNumber);
poco_debug(
Logger(),
fmt::format("Removing device '{}' from device table.",
SerialNumber));
StorageService()->DevicesDB().DeleteDevice(SerialNumber);
}
}
}
}
} catch (const Poco::Exception &E) {
Logger().log(E);
}
}
}
};
int NewCommandHandler::Start() {
Types::TopicNotifyFunction F = [this](std::string s1, std::string s2) {
this->CommandReceived(s1, s2);
};
WatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::COMMAND, F);
Worker_.start(*this);
return 0;
};
void NewCommandHandler::Stop() {
KafkaManager()->UnregisterTopicWatcher(KafkaTopics::COMMAND, WatcherId_);
Running_ = false;
Worker_.wakeUp();
Worker_.join();
};
bool NewCommandHandler::Update() {
Worker_.wakeUp();
return true;
}
void NewCommandHandler::CommandReceived(const std::string &Key, const std::string &Message) {
std::lock_guard G(Mutex_);
NewCommands_.push(std::make_pair(Key, Message));
}
} // namespace OpenWifi