diff --git a/src/Tasks/VenueConfigUpdater.h b/src/Tasks/VenueConfigUpdater.h index f842529..69a4a1a 100644 --- a/src/Tasks/VenueConfigUpdater.h +++ b/src/Tasks/VenueConfigUpdater.h @@ -20,8 +20,6 @@ namespace OpenWifi { auto Status = Results->get("status").extract(); auto Rejected = Status->getArray("rejected"); std::transform(Rejected->begin(),Rejected->end(),std::back_inserter(Warnings), [](auto i) -> auto { return i.toString(); }); -// for(const auto &i:*Rejected) - // Warnings.push_back(i.toString()); } } catch (...) { } @@ -106,78 +104,66 @@ namespace OpenWifi { ProvObjects::Venue Venue; uint64_t Updated = 0, Failed = 0 , BadConfigs = 0 ; if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) { - const std::size_t MaxThreads=16; - struct tState { - Poco::Thread thr_; - VenueDeviceConfigUpdater *task= nullptr; - }; N.content.title = fmt::format("Updating {} configurations", Venue.info.name); N.content.jobId = JobId(); - std::array Tasks; + Poco::ThreadPool Pool_; + std::list JobList; + for(const auto &uuid:Venue.devices) { auto NewTask = new VenueDeviceConfigUpdater(uuid, Venue.info.name, Logger()); - // std::cout << "Scheduling config push for " << uuid << std::endl; - bool found_slot = false; - while (!found_slot) { - for (auto &cur_task: Tasks) { - if (cur_task.task == nullptr) { - cur_task.task = NewTask; - cur_task.thr_.start(*NewTask); - found_slot = true; - break; - } + bool TaskAdded=false; + while(!TaskAdded) { + if (Pool_.available()) { + JobList.push_back(NewTask); + Pool_.start(*NewTask); + TaskAdded = true; + continue; } + } - // Let's look for a slot... - if (!found_slot) { - for (auto &cur_task: Tasks) { - if (cur_task.task != nullptr && cur_task.task->started_) { - if (cur_task.thr_.isRunning()) - continue; - if (!cur_task.thr_.isRunning() && cur_task.task->done_) { - cur_task.thr_.join(); - Updated += cur_task.task->updated_; - Failed += cur_task.task->failed_; - BadConfigs += cur_task.task->bad_config_; - cur_task.task->started_ = cur_task.task->done_ = false; - delete cur_task.task; - cur_task.task = nullptr; - } - } + for(auto job_it = JobList.begin(); job_it !=JobList.end();) { + VenueDeviceConfigUpdater * current_job = *job_it; + if(current_job!= nullptr && current_job->done_) { + Updated += current_job->updated_; + Failed += current_job->failed_; + BadConfigs += current_job->bad_config_; + if(current_job->updated_) { + N.content.success.push_back(current_job->SerialNumber); + } else if(current_job->failed_) { + N.content.warning.push_back(current_job->SerialNumber); + } else { + N.content.error.push_back(current_job->SerialNumber); } + job_it = JobList.erase(job_it); + delete current_job; + } else { + ++job_it; } } } + Logger().debug("Waiting for outstanding update threads to finish."); - bool stillTasksRunning=true; - while(stillTasksRunning) { - stillTasksRunning = false; - for(auto &cur_task:Tasks) { - if(cur_task.task!= nullptr && cur_task.task->started_) { - if(cur_task.thr_.isRunning()) { - stillTasksRunning = true; - continue; - } - if(!cur_task.thr_.isRunning() && cur_task.task->done_) { - cur_task.thr_.join(); - if(cur_task.task->updated_) { - Updated++; - N.content.success.push_back(cur_task.task->SerialNumber); - } else if(cur_task.task->failed_) { - Failed++; - N.content.warning.push_back(cur_task.task->SerialNumber); - } else { - BadConfigs++; - N.content.error.push_back(cur_task.task->SerialNumber); - } - cur_task.task->started_ = cur_task.task->done_ = false; - delete cur_task.task; - cur_task.task = nullptr; - } + Pool_.joinAll(); + for(auto job_it = JobList.begin(); job_it !=JobList.end();) { + VenueDeviceConfigUpdater * current_job = *job_it; + if(current_job!= nullptr && current_job->done_) { + Updated += current_job->updated_; + Failed += current_job->failed_; + BadConfigs += current_job->bad_config_; + if(current_job->updated_) { + N.content.success.push_back(current_job->SerialNumber); + } else if(current_job->failed_) { + N.content.warning.push_back(current_job->SerialNumber); + } else { + N.content.error.push_back(current_job->SerialNumber); } + job_it = JobList.erase(job_it); + delete current_job; + } else { + ++job_it; } } diff --git a/src/Tasks/VenueRebooter.h b/src/Tasks/VenueRebooter.h index c277040..ae1bbc4 100644 --- a/src/Tasks/VenueRebooter.h +++ b/src/Tasks/VenueRebooter.h @@ -67,11 +67,10 @@ namespace OpenWifi { uint64_t rebooted_ = 0, failed_ = 0; if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) { - Poco::ThreadPool Pool_; - N.content.title = fmt::format("Rebooting {} devices.", Venue.info.name); N.content.jobId = JobId(); + Poco::ThreadPool Pool_; std::list JobList; for(const auto &uuid:Venue.devices) { diff --git a/src/Tasks/VenueUpgrade.h b/src/Tasks/VenueUpgrade.h index 100d809..c5dde99 100644 --- a/src/Tasks/VenueUpgrade.h +++ b/src/Tasks/VenueUpgrade.h @@ -87,77 +87,60 @@ namespace OpenWifi { ProvObjects::Venue Venue; uint64_t upgraded_ = 0, failed_ = 0; if(StorageService()->VenueDB().GetRecord("id",VenueUUID_,Venue)) { - const std::size_t MaxThreads=16; - struct tState { - Poco::Thread thr_; - VenueDeviceUpgrade *task= nullptr; - }; N.content.title = fmt::format("Upgrading {} devices.", Venue.info.name); N.content.jobId = JobId(); - std::array Tasks; - ProvObjects::DeviceRules Rules; + Poco::ThreadPool Pool_; + std::list JobList; + ProvObjects::DeviceRules Rules; StorageService()->VenueDB().EvaluateDeviceRules(Venue.info.id, Rules); for(const auto &uuid:Venue.devices) { - auto NewTask = new VenueDeviceUpgrade(uuid, Venue.info.name, Rules,Logger()); - // std::cout << "Scheduling config push for " << uuid << std::endl; - bool found_slot = false; - while (!found_slot) { - for (auto &cur_task: Tasks) { - if (cur_task.task == nullptr) { - cur_task.task = NewTask; - cur_task.thr_.start(*NewTask); - found_slot = true; - break; - } + auto NewTask = new VenueDeviceUpgrade(uuid, Venue.info.name, Rules, Logger()); + bool TaskAdded = false; + while (!TaskAdded) { + if (Pool_.available()) { + JobList.push_back(NewTask); + Pool_.start(*NewTask); + TaskAdded = true; + continue; } + } - // Let's look for a slot... - if (!found_slot) { - for (auto &cur_task: Tasks) { - if (cur_task.task != nullptr && cur_task.task->started_) { - if (cur_task.thr_.isRunning()) - continue; - if (!cur_task.thr_.isRunning() && cur_task.task->done_) { - cur_task.thr_.join(); - upgraded_ += cur_task.task->upgraded_; - failed_ += cur_task.task->failed_; - cur_task.task->started_ = cur_task.task->done_ = false; - delete cur_task.task; - cur_task.task = nullptr; - } - } - } + for (auto job_it = JobList.begin(); job_it != JobList.end();) { + VenueDeviceUpgrade *current_job = *job_it; + if (current_job != nullptr && current_job->done_) { + if (current_job->upgraded_) + N.content.success.push_back(current_job->SerialNumber); + else + N.content.warning.push_back(current_job->SerialNumber); + upgraded_ += current_job->upgraded_; + failed_ += current_job->failed_; + job_it = JobList.erase(job_it); + delete current_job; + } else { + ++job_it; } } } - Logger().debug("Waiting for outstanding update threads to finish."); - bool stillTasksRunning=true; - while(stillTasksRunning) { - stillTasksRunning = false; - for(auto &cur_task:Tasks) { - if(cur_task.task!= nullptr && cur_task.task->started_) { - if(cur_task.thr_.isRunning()) { - stillTasksRunning = true; - continue; - } - if(!cur_task.thr_.isRunning() && cur_task.task->done_) { - cur_task.thr_.join(); - if(cur_task.task->upgraded_) { - upgraded_++; - N.content.success.push_back(cur_task.task->SerialNumber); - } else if(cur_task.task->failed_) { - failed_++; - N.content.warning.push_back(cur_task.task->SerialNumber); - } - cur_task.task->started_ = cur_task.task->done_ = false; - delete cur_task.task; - cur_task.task = nullptr; - } - } + + Logger().debug("Waiting for outstanding upgrade threads to finish."); + Pool_.joinAll(); + for(auto job_it = JobList.begin(); job_it !=JobList.end();) { + VenueDeviceUpgrade * current_job = *job_it; + if(current_job!= nullptr && current_job->done_) { + if(current_job->upgraded_) + N.content.success.push_back(current_job->SerialNumber); + else + N.content.warning.push_back(current_job->SerialNumber); + upgraded_ += current_job->upgraded_; + failed_ += current_job->failed_; + job_it = JobList.erase(job_it); + delete current_job; + } else { + ++job_it; } }