Files
archived-wlan-cloud-ucentralgw/src/CommandManager.cpp
2022-01-27 11:46:53 -08:00

183 lines
5.8 KiB
C++

//
// License type: BSD 3-Clause License
// License copy: https://github.com/Telecominfraproject/wlan-cloud-ucentralgw/blob/master/LICENSE
//
// Created by Stephane Bourque on 2021-03-04.
// Arilia Wireless Inc.
//
#include <algorithm>
#include "Poco/JSON/Parser.h"
#include "CommandManager.h"
#include "DeviceRegistry.h"
#include "RESTObjects//RESTAPI_GWobjects.h"
#include "StorageService.h"
#include "framework/MicroService.h"
#include "framework/uCentral_Protocol.h"
namespace OpenWifi {
void CommandManager::run() {
Running_ = true;
while(Running_)
{
Poco::Thread::trySleep(30000);
if(!Running_)
break;
std::vector<GWObjects::CommandDetails> Commands;
if(StorageService()->GetReadyToExecuteCommands(0,200,Commands))
{
for(auto & Cmd: Commands)
{
if(!Running_)
break;
try {
Poco::JSON::Parser P;
bool Sent;
Logger().information(Poco::format("Parsing: %s", Cmd.UUID));
auto Params = P.parse(Cmd.Details).extract<Poco::JSON::Object::Ptr>();
Logger().information(Poco::format("Parsed: %s", Cmd.UUID));
auto Result = PostCommandDisk( Cmd.SerialNumber,
Cmd.Command,
*Params,
Cmd.UUID,
Sent);
if(Sent) {
StorageService()->SetCommandExecuted(Cmd.UUID);
Logger().information(Poco::format("%s: Sent command '%s-%s'", Cmd.SerialNumber, Cmd.Command, Cmd.UUID));
} else {
Logger().information(Poco::format("%s: Could not send command '%s-%s'", Cmd.SerialNumber, Cmd.Command, Cmd.UUID));
}
} catch (const Poco::Exception &E) {
Logger().information(Poco::format("%s: Failed command '%s-%s'", Cmd.SerialNumber, Cmd.Command, Cmd.UUID));
Logger().log(E);
StorageService()->SetCommandExecuted(Cmd.UUID);
} catch (...) {
Logger().information(Poco::format("%s: Exception - hard fail - Failed command '%s-%s'", Cmd.SerialNumber, Cmd.Command, Cmd.UUID));
StorageService()->SetCommandExecuted(Cmd.UUID);
}
}
}
}
}
int CommandManager::Start() {
Logger().notice("Starting...");
ManagerThread.start(*this);
JanitorCallback_ = std::make_unique<Poco::TimerCallback<CommandManager>>(*this,&CommandManager::onTimer);
Timer_.setStartInterval( 10000 );
Timer_.setPeriodicInterval(5 * 60 * 1000); // 1 hours
Timer_.start(*JanitorCallback_);
return 0;
}
void CommandManager::Stop() {
Logger().notice("Stopping...");
Running_ = false;
Timer_.stop();
ManagerThread.wakeUp();
ManagerThread.join();
}
void CommandManager::WakeUp() {
Logger().notice("Waking up...");
ManagerThread.wakeUp();
}
void CommandManager::onTimer(Poco::Timer & timer) {
std::lock_guard G(Mutex_);
Logger().information("Removing expired commands: start");
auto Now = std::chrono::high_resolution_clock::now();
for(auto i=OutStandingRequests_.begin();i!=OutStandingRequests_.end();) {
std::chrono::duration<double, std::milli> delta = Now - i->second->submitted;
if(delta > 120000ms) {
i = OutStandingRequests_.erase(i);
} else {
++i;
}
}
Logger().information("Removing expired commands: done");
}
std::shared_ptr<CommandManager::promise_type_t> CommandManager::PostCommand( const std::string &SerialNumber,
const std::string &Method,
const Poco::JSON::Object &Params,
const std::string &UUID,
bool oneway_rpc,
bool disk_only,
bool & Sent) {
Sent=false;
if(!DeviceRegistry()->Connected(SerialNumber)) {
return nullptr;
}
std::stringstream ToSend;
auto Object = std::make_shared<RpcObject>();
CommandTagIndex Idx;
{
std::lock_guard M(Mutex_);
if (oneway_rpc)
Idx.Id = 1;
else
Idx.Id = ++Id_;
Idx.SerialNumber = SerialNumber;
Poco::JSON::Object CompleteRPC;
CompleteRPC.set(uCentralProtocol::JSONRPC, uCentralProtocol::JSONRPC_VERSION);
CompleteRPC.set(uCentralProtocol::ID, Idx.Id);
CompleteRPC.set(uCentralProtocol::METHOD, Method);
CompleteRPC.set(uCentralProtocol::PARAMS, Params);
Poco::JSON::Stringifier::stringify(CompleteRPC, ToSend);
Logger().information(
Poco::format("(%s): Sending command '%s', ID: %lu", SerialNumber, Method, Idx.Id));
Object->submitted = std::chrono::high_resolution_clock::now();
Object->uuid = UUID;
if(disk_only) {
Object->rpc_entry = nullptr;
} else {
Object->rpc_entry = std::make_shared<CommandManager::promise_type_t>();
}
OutStandingRequests_[Idx] = Object;
}
if(DeviceRegistry()->SendFrame(SerialNumber, ToSend.str())) {
Sent=true;
return Object->rpc_entry;
}
return nullptr;
}
void CommandManager::PostCommandResult(const std::string &SerialNumber, Poco::JSON::Object::Ptr Obj) {
if(!Obj->has(uCentralProtocol::ID)){
Logger().error(Poco::format("(%s): Invalid RPC response.",SerialNumber));
return;
}
uint64_t ID = Obj->get(uCentralProtocol::ID);
if(ID<2) {
Logger().error(Poco::format("(%s): Ignoring RPC response.",SerialNumber));
return;
}
std::lock_guard G(Mutex_);
auto Idx = CommandTagIndex{.Id = ID, .SerialNumber = SerialNumber};
auto RPC = OutStandingRequests_.find(Idx);
if (RPC == OutStandingRequests_.end()) {
Logger().warning(Poco::format("(%s): Outdated RPC %lu", SerialNumber, ID));
return;
}
std::chrono::duration<double, std::milli> rpc_execution_time = std::chrono::high_resolution_clock::now() - RPC->second->submitted;
StorageService()->CommandCompleted(RPC->second->uuid, Obj, rpc_execution_time, true);
if(RPC->second->rpc_entry) {
RPC->second->rpc_entry->set_value(Obj);
}
Logger().information(Poco::format("(%s): Received RPC answer %lu", SerialNumber, ID));
}
} // namespace