Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
This commit is contained in:
stephb9959
2022-07-07 10:16:35 -07:00
parent 4d2ccec1a8
commit cf441de197
8 changed files with 80 additions and 221 deletions

View File

@@ -125,7 +125,6 @@ add_executable(owprov
src/RESTAPI/RESTAPI_db_helpers.h
src/JobController.cpp src/JobController.h
src/JobRegistrations.cpp
src/storage/storage_jobs.cpp src/storage/storage_jobs.h
src/storage/storage_maps.cpp src/storage/storage_maps.h
src/RESTAPI/RESTAPI_map_handler.cpp src/RESTAPI/RESTAPI_map_handler.h
src/RESTAPI/RESTAPI_map_list_handler.cpp src/RESTAPI/RESTAPI_map_list_handler.h

2
build
View File

@@ -1 +1 @@
131
133

View File

@@ -30,6 +30,28 @@ namespace OpenWifi {
Utils::SetThreadName("job-controller");
while(Running_) {
Poco::Thread::trySleep(2000);
std::lock_guard G(Mutex_);
for(auto &job:jobs_) {
if(job!=nullptr) {
if(job->Started()==0 && Pool_.used()<Pool_.available()) {
std::cout << "Starting: " << job->Name() << " ID:" << job->JobId() << std::endl;
job->Start();
Pool_.start(*job);
}
}
}
for(auto it = jobs_.begin(); it!=jobs_.end();) {
if(*it!=nullptr && (*it)->Completed()!=0) {
auto tmp = it;
it = jobs_.erase(it);
delete *tmp;
} else {
++it;
}
}
}
}

View File

@@ -7,99 +7,45 @@
#include <vector>
#include <utility>
#include <functional>
#include <list>
#include "framework/MicroService.h"
namespace OpenWifi {
class Job {
public:
struct Parameter {
std::string name;
std::string value;
inline void to_json(Poco::JSON::Object &Obj) const {
RESTAPI_utils::field_to_json(Obj,"name",name);
RESTAPI_utils::field_to_json(Obj,"value",value);
}
class Job : public Poco::Runnable {
public:
Job(const std::string &JobID, const std::string &name, const std::vector<std::string> & parameters, uint64_t when, const SecurityObjects::UserInfo &UI, Poco::Logger &L) :
jobId_(JobID),
name_(name),
parameters_(parameters),
when_(when),
userinfo_(UI),
Logger_(L)
{};
inline bool from_json(const Poco::JSON::Object::Ptr &Obj) {
try {
RESTAPI_utils::field_from_json(Obj,"name",name);
RESTAPI_utils::field_from_json(Obj,"value",value);
return true;
} catch (...) {
virtual void run() = 0;
[[nodiscard]] std::string Name() const { return name_; }
const SecurityObjects::UserInfo & UserInfo() const { return userinfo_; }
Poco::Logger & Logger() { return Logger_; }
const std::string & JobId() const { return jobId_; }
const std::string & Parameter(int x) const { return parameters_[x];}
uint64_t When() const { return when_; }
void Start() { started_ = OpenWifi::Now(); }
uint64_t Started() const { return started_; }
uint64_t Completed() const { return completed_;}
void Complete() { completed_ = OpenWifi::Now(); }
}
return false;
}
};
struct Status {
Types::UUID_t UUID;
uint64_t Start = 0 ;
uint64_t Progress = 0 ;
uint64_t Completed = 0 ;
std::string CurrentDisplay;
};
struct Result {
int Error=0;
std::string Reason;
};
typedef std::vector<Parameter> Parameters;
typedef std::vector<Parameters> ParametersVec;
typedef std::function<bool(const Parameters &Parameters, Result &Result, bool &Retry)> WorkerFunction;
typedef std::vector<Status> Statuses;
Job(std::string Title,
std::string Description,
std::string RegisteredName,
ParametersVec Parameters,
[[maybe_unused]] bool Parallel=true) :
Title_(std::move(Title)),
Description_(std::move(Description)),
RegisteredName_(std::move(RegisteredName)),
Parameters_(std::move(Parameters))
{
UUID_ = MicroService::instance().CreateUUID();
}
[[nodiscard]] inline const Types::UUID_t & ID() const { return UUID_; }
private:
Types::UUID_t UUID_;
std::string Title_;
std::string Description_;
std::string RegisteredName_;
ParametersVec Parameters_;
private:
std::string jobId_;
std::string name_;
std::vector<std::string> parameters_;
uint64_t when_=0;
SecurityObjects::UserInfo userinfo_;
Poco::Logger & Logger_;
uint64_t started_=0;
uint64_t completed_=0;
};
class JobRegistry {
public:
static auto instance() {
static auto instance_ = new JobRegistry;
return instance_;
}
inline void RegisterJobType( const std::string & JobType, Job::WorkerFunction Function) {
JobTypes_[JobType] = std::move(Function);
}
inline bool Execute(const std::string &JobType, const Job::Parameters & Params, Job::Result &Result, bool & Retry) {
auto Hint = JobTypes_.find(JobType);
if(Hint != end(JobTypes_)) {
Hint->second(Params, Result, Retry);
return true;
}
return false;
}
private:
std::map<std::string,Job::WorkerFunction> JobTypes_;
};
inline auto JobRegistry() { return JobRegistry::instance(); }
class JobController : public SubSystemServer, Poco::Runnable {
public:
static auto instance() {
@@ -112,11 +58,18 @@ namespace OpenWifi {
void run() override;
inline void wakeup() { Thr_.wakeUp(); }
bool JobList(Job::Statuses & Statuses);
void AddJob( Job* newJob ) {
std::lock_guard G(Mutex_);
jobs_.push_back(newJob);
}
private:
Poco::Thread Thr_;
std::atomic_bool Running_=false;
Poco::Thread Thr_;
std::atomic_bool Running_=false;
std::list<Job *> jobs_;
Poco::ThreadPool Pool_;
JobController() noexcept:
SubSystemServer("JobController", "JOB-SVR", "job")

View File

@@ -228,10 +228,10 @@ namespace OpenWifi{
Poco::JSON::Object Answer;
SNL.serialNumbers = Existing.devices;
auto Task = new VenueConfigUpdater(UUID,UserInfo_.userinfo,0,Logger());
auto JobId = Task->Start();
auto JobId = MicroService::instance().CreateUUID();
Types::StringVec Parameters{UUID};;
auto NewJob = new VenueConfigUpdater(JobId,"VenueConfigurationUpdater", Parameters, 0, UserInfo_.userinfo, Logger());
JobController()->AddJob(dynamic_cast<Job*>(NewJob));
SNL.to_json(Answer);
Answer.set("jobId",JobId);
return ReturnObject(Answer);

View File

@@ -9,6 +9,7 @@
#include "APConfig.h"
#include "sdks/SDK_gw.h"
#include "framework/WebSocketClientNotifications.h"
#include "JobController.h"
namespace OpenWifi {
@@ -82,45 +83,21 @@ namespace OpenWifi {
inline Poco::Logger & Logger() { return Logger_; }
};
class VenueConfigUpdater: public Poco::Runnable {
class VenueConfigUpdater: public Job {
public:
explicit VenueConfigUpdater(const std::string & VenueUUID, const SecurityObjects::UserInfo &UI, uint64_t When, Poco::Logger &L) :
VenueUUID_(VenueUUID),
UI_(UI),
When_(When),
Logger_(L)
{
VenueConfigUpdater(const std::string &JobID, const std::string &name, const std::vector<std::string> & parameters, uint64_t when, const SecurityObjects::UserInfo &UI, Poco::Logger &L) :
Job(JobID, name, parameters, when, UI, L) {
}
inline std::string Start() {
JobId_ = MicroService::CreateUUID();
Worker_.start(*this);
return JobId_;
}
private:
std::string VenueUUID_;
SecurityObjects::UserInfo UI_;
uint64_t When_;
Poco::Logger &Logger_;
Poco::Thread Worker_;
std::string JobId_;
Poco::ThreadPool Pool_{2,16,300};
inline Poco::Logger & Logger() { return Logger_; }
inline void run() final {
inline virtual void run() {
std::string VenueUUID_;
Utils::SetThreadName("venue-update");
if(When_ && When_>OpenWifi::Now())
Poco::Thread::trySleep( (long) (When_ - OpenWifi::Now()) * 1000 );
VenueUUID_ = Parameter(0);
WebSocketNotification<WebSocketNotificationJobContent> N;
Logger().information(fmt::format("Job {} Starting.", JobId_));
ProvObjects::Venue Venue;
uint64_t Updated = 0, Failed = 0 , BadConfigs = 0 ;
if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) {
@@ -131,7 +108,7 @@ namespace OpenWifi {
};
N.content.title = fmt::format("Updating {} configurations", Venue.info.name);
N.content.jobId = JobId_;
N.content.jobId = JobId();
std::array<tState,MaxThreads> Tasks;
@@ -200,19 +177,18 @@ namespace OpenWifi {
}
N.content.details = fmt::format("Job {} Completed: {} updated, {} failed to update, {} bad configurations. ",
JobId_, Updated ,Failed, BadConfigs);
JobId(), Updated ,Failed, BadConfigs);
} else {
N.content.details = fmt::format("Venue {} no longer exists.",VenueUUID_);
Logger().warning(N.content.details);
}
WebSocketClientNotificationVenueUpdateJobCompletionToUser(UI_.email, N);
WebSocketClientNotificationVenueUpdateJobCompletionToUser(UserInfo().email, N);
Logger().information(fmt::format("Job {} Completed: {} updated, {} failed to update , {} bad configurations.",
JobId_, Updated ,Failed, BadConfigs));
JobId(), Updated ,Failed, BadConfigs));
Utils::SetThreadName("free");
delete this;
Complete();
}
};

View File

@@ -1,51 +0,0 @@
//
// Created by stephane bourque on 2021-10-28.
//
#include "storage_jobs.h"
#include "framework/OpenWifiTypes.h"
#include "framework/MicroService.h"
namespace OpenWifi {
static ORM::FieldVec JobDB_Fields{
// object info
ORM::Field{"id",64, true},
ORM::Field{"name",ORM::FieldType::FT_TEXT},
ORM::Field{"description",ORM::FieldType::FT_TEXT},
ORM::Field{"type",ORM::FieldType::FT_TEXT},
ORM::Field{"progress",ORM::FieldType::FT_BIGINT},
ORM::Field{"total",ORM::FieldType::FT_BIGINT},
ORM::Field{"parameters",ORM::FieldType::FT_TEXT}
};
static ORM::IndexVec JobDB_Indexes{
{ std::string("job_name_index"),
ORM::IndexEntryVec{
{std::string("name"),
ORM::Indextype::ASC} } }
};
JobDB::JobDB( OpenWifi::DBType T, Poco::Data::SessionPool & P, Poco::Logger &L) :
DB(T, "jobs", JobDB_Fields, JobDB_Indexes, P, L, "job") {}
}
template<> void ORM::DB<OpenWifi::JobDBRecordType, OpenWifi::JobRecord>::Convert(const OpenWifi::JobDBRecordType &In, OpenWifi::JobRecord &Out) {
Out.id = In.get<0>();
Out.name = In.get<1>();
Out.description = In.get<2>();
Out.type = In.get<3>();
Out.progress = In.get<4>();
Out.total = In.get<5>();
Out.parameters = OpenWifi::RESTAPI_utils::to_array_of_array_of_object<OpenWifi::Job::Parameter>(In.get<3>());
}
template<> void ORM::DB<OpenWifi::JobDBRecordType, OpenWifi::JobRecord>::Convert(const OpenWifi::JobRecord &In, OpenWifi::JobDBRecordType &Out) {
Out.set<0>(In.id);
Out.set<1>(In.name);
Out.set<2>(In.description);
Out.set<3>(In.type);
Out.set<4>(In.progress);
Out.set<5>(In.total);
Out.set<6>(OpenWifi::RESTAPI_utils::to_string(In.parameters));
}

View File

@@ -1,40 +0,0 @@
//
// Created by stephane bourque on 2021-10-28.
//
#pragma once
#include "framework/orm.h"
#include "JobController.h"
namespace OpenWifi {
typedef Poco::Tuple<
std::string,
std::string,
std::string,
std::string,
uint64_t,
uint64_t,
std::string
> JobDBRecordType;
struct JobRecord {
Types::UUID_t id;
std::string name;
std::string description;
std::string type;
uint64_t progress;
uint64_t total;
Job::ParametersVec parameters;
// void from_string(const std::string &S);
// std::string to_string() const;
};
class JobDB : public ORM::DB<JobDBRecordType, JobRecord> {
public:
JobDB( OpenWifi::DBType T, Poco::Data::SessionPool & P, Poco::Logger &L);
virtual ~JobDB() {};
private:
};
}