Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
This commit is contained in:
stephb9959
2022-07-09 21:13:10 -07:00
parent 30b8665d7d
commit 46db18d7cd
3 changed files with 87 additions and 119 deletions

View File

@@ -20,8 +20,6 @@ namespace OpenWifi {
auto Status = Results->get("status").extract<Poco::JSON::Object::Ptr>(); auto Status = Results->get("status").extract<Poco::JSON::Object::Ptr>();
auto Rejected = Status->getArray("rejected"); auto Rejected = Status->getArray("rejected");
std::transform(Rejected->begin(),Rejected->end(),std::back_inserter(Warnings), [](auto i) -> auto { return i.toString(); }); std::transform(Rejected->begin(),Rejected->end(),std::back_inserter(Warnings), [](auto i) -> auto { return i.toString(); });
// for(const auto &i:*Rejected)
// Warnings.push_back(i.toString());
} }
} catch (...) { } catch (...) {
} }
@@ -106,78 +104,66 @@ namespace OpenWifi {
ProvObjects::Venue Venue; ProvObjects::Venue Venue;
uint64_t Updated = 0, Failed = 0 , BadConfigs = 0 ; uint64_t Updated = 0, Failed = 0 , BadConfigs = 0 ;
if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) { if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) {
const std::size_t MaxThreads=16;
struct tState {
Poco::Thread thr_;
VenueDeviceConfigUpdater *task= nullptr;
};
N.content.title = fmt::format("Updating {} configurations", Venue.info.name); N.content.title = fmt::format("Updating {} configurations", Venue.info.name);
N.content.jobId = JobId(); N.content.jobId = JobId();
std::array<tState,MaxThreads> Tasks; Poco::ThreadPool Pool_;
std::list<VenueDeviceConfigUpdater*> JobList;
for(const auto &uuid:Venue.devices) { for(const auto &uuid:Venue.devices) {
auto NewTask = new VenueDeviceConfigUpdater(uuid, Venue.info.name, Logger()); auto NewTask = new VenueDeviceConfigUpdater(uuid, Venue.info.name, Logger());
// std::cout << "Scheduling config push for " << uuid << std::endl; bool TaskAdded=false;
bool found_slot = false; while(!TaskAdded) {
while (!found_slot) { if (Pool_.available()) {
for (auto &cur_task: Tasks) { JobList.push_back(NewTask);
if (cur_task.task == nullptr) { Pool_.start(*NewTask);
cur_task.task = NewTask; TaskAdded = true;
cur_task.thr_.start(*NewTask); continue;
found_slot = true;
break;
}
} }
}
// Let's look for a slot... for(auto job_it = JobList.begin(); job_it !=JobList.end();) {
if (!found_slot) { VenueDeviceConfigUpdater * current_job = *job_it;
for (auto &cur_task: Tasks) { if(current_job!= nullptr && current_job->done_) {
if (cur_task.task != nullptr && cur_task.task->started_) { Updated += current_job->updated_;
if (cur_task.thr_.isRunning()) Failed += current_job->failed_;
continue; BadConfigs += current_job->bad_config_;
if (!cur_task.thr_.isRunning() && cur_task.task->done_) { if(current_job->updated_) {
cur_task.thr_.join(); N.content.success.push_back(current_job->SerialNumber);
Updated += cur_task.task->updated_; } else if(current_job->failed_) {
Failed += cur_task.task->failed_; N.content.warning.push_back(current_job->SerialNumber);
BadConfigs += cur_task.task->bad_config_; } else {
cur_task.task->started_ = cur_task.task->done_ = false; N.content.error.push_back(current_job->SerialNumber);
delete cur_task.task;
cur_task.task = nullptr;
}
}
} }
job_it = JobList.erase(job_it);
delete current_job;
} else {
++job_it;
} }
} }
} }
Logger().debug("Waiting for outstanding update threads to finish."); Logger().debug("Waiting for outstanding update threads to finish.");
bool stillTasksRunning=true; Pool_.joinAll();
while(stillTasksRunning) { for(auto job_it = JobList.begin(); job_it !=JobList.end();) {
stillTasksRunning = false; VenueDeviceConfigUpdater * current_job = *job_it;
for(auto &cur_task:Tasks) { if(current_job!= nullptr && current_job->done_) {
if(cur_task.task!= nullptr && cur_task.task->started_) { Updated += current_job->updated_;
if(cur_task.thr_.isRunning()) { Failed += current_job->failed_;
stillTasksRunning = true; BadConfigs += current_job->bad_config_;
continue; if(current_job->updated_) {
} N.content.success.push_back(current_job->SerialNumber);
if(!cur_task.thr_.isRunning() && cur_task.task->done_) { } else if(current_job->failed_) {
cur_task.thr_.join(); N.content.warning.push_back(current_job->SerialNumber);
if(cur_task.task->updated_) { } else {
Updated++; N.content.error.push_back(current_job->SerialNumber);
N.content.success.push_back(cur_task.task->SerialNumber);
} else if(cur_task.task->failed_) {
Failed++;
N.content.warning.push_back(cur_task.task->SerialNumber);
} else {
BadConfigs++;
N.content.error.push_back(cur_task.task->SerialNumber);
}
cur_task.task->started_ = cur_task.task->done_ = false;
delete cur_task.task;
cur_task.task = nullptr;
}
} }
job_it = JobList.erase(job_it);
delete current_job;
} else {
++job_it;
} }
} }

View File

@@ -67,11 +67,10 @@ namespace OpenWifi {
uint64_t rebooted_ = 0, failed_ = 0; uint64_t rebooted_ = 0, failed_ = 0;
if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) { if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) {
Poco::ThreadPool Pool_;
N.content.title = fmt::format("Rebooting {} devices.", Venue.info.name); N.content.title = fmt::format("Rebooting {} devices.", Venue.info.name);
N.content.jobId = JobId(); N.content.jobId = JobId();
Poco::ThreadPool Pool_;
std::list<VenueDeviceRebooter*> JobList; std::list<VenueDeviceRebooter*> JobList;
for(const auto &uuid:Venue.devices) { for(const auto &uuid:Venue.devices) {

View File

@@ -87,77 +87,60 @@ namespace OpenWifi {
ProvObjects::Venue Venue; ProvObjects::Venue Venue;
uint64_t upgraded_ = 0, failed_ = 0; uint64_t upgraded_ = 0, failed_ = 0;
if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) { if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) {
const std::size_t MaxThreads=16;
struct tState {
Poco::Thread thr_;
VenueDeviceUpgrade *task= nullptr;
};
N.content.title = fmt::format("Upgrading {} devices.", Venue.info.name); N.content.title = fmt::format("Upgrading {} devices.", Venue.info.name);
N.content.jobId = JobId(); N.content.jobId = JobId();
std::array<tState,MaxThreads> Tasks; Poco::ThreadPool Pool_;
ProvObjects::DeviceRules Rules; std::list<VenueDeviceUpgrade*> JobList;
ProvObjects::DeviceRules Rules;
StorageService()->VenueDB().EvaluateDeviceRules(Venue.info.id, Rules); StorageService()->VenueDB().EvaluateDeviceRules(Venue.info.id, Rules);
for(const auto &uuid:Venue.devices) { for(const auto &uuid:Venue.devices) {
auto NewTask = new VenueDeviceUpgrade(uuid, Venue.info.name, Rules,Logger()); auto NewTask = new VenueDeviceUpgrade(uuid, Venue.info.name, Rules, Logger());
// std::cout << "Scheduling config push for " << uuid << std::endl; bool TaskAdded = false;
bool found_slot = false; while (!TaskAdded) {
while (!found_slot) { if (Pool_.available()) {
for (auto &cur_task: Tasks) { JobList.push_back(NewTask);
if (cur_task.task == nullptr) { Pool_.start(*NewTask);
cur_task.task = NewTask; TaskAdded = true;
cur_task.thr_.start(*NewTask); continue;
found_slot = true;
break;
}
} }
}
// Let's look for a slot... for (auto job_it = JobList.begin(); job_it != JobList.end();) {
if (!found_slot) { VenueDeviceUpgrade *current_job = *job_it;
for (auto &cur_task: Tasks) { if (current_job != nullptr && current_job->done_) {
if (cur_task.task != nullptr && cur_task.task->started_) { if (current_job->upgraded_)
if (cur_task.thr_.isRunning()) N.content.success.push_back(current_job->SerialNumber);
continue; else
if (!cur_task.thr_.isRunning() && cur_task.task->done_) { N.content.warning.push_back(current_job->SerialNumber);
cur_task.thr_.join(); upgraded_ += current_job->upgraded_;
upgraded_ += cur_task.task->upgraded_; failed_ += current_job->failed_;
failed_ += cur_task.task->failed_; job_it = JobList.erase(job_it);
cur_task.task->started_ = cur_task.task->done_ = false; delete current_job;
delete cur_task.task; } else {
cur_task.task = nullptr; ++job_it;
}
}
}
} }
} }
} }
Logger().debug("Waiting for outstanding update threads to finish.");
bool stillTasksRunning=true; Logger().debug("Waiting for outstanding upgrade threads to finish.");
while(stillTasksRunning) { Pool_.joinAll();
stillTasksRunning = false; for(auto job_it = JobList.begin(); job_it !=JobList.end();) {
for(auto &cur_task:Tasks) { VenueDeviceUpgrade * current_job = *job_it;
if(cur_task.task!= nullptr && cur_task.task->started_) { if(current_job!= nullptr && current_job->done_) {
if(cur_task.thr_.isRunning()) { if(current_job->upgraded_)
stillTasksRunning = true; N.content.success.push_back(current_job->SerialNumber);
continue; else
} N.content.warning.push_back(current_job->SerialNumber);
if(!cur_task.thr_.isRunning() && cur_task.task->done_) { upgraded_ += current_job->upgraded_;
cur_task.thr_.join(); failed_ += current_job->failed_;
if(cur_task.task->upgraded_) { job_it = JobList.erase(job_it);
upgraded_++; delete current_job;
N.content.success.push_back(cur_task.task->SerialNumber); } else {
} else if(cur_task.task->failed_) { ++job_it;
failed_++;
N.content.warning.push_back(cur_task.task->SerialNumber);
}
cur_task.task->started_ = cur_task.task->done_ = false;
delete cur_task.task;
cur_task.task = nullptr;
}
}
} }
} }