Fixing shutdown crash

This commit is contained in:
stephb9959
2021-11-14 14:18:02 -08:00
parent cca4441ac7
commit 5e35906aec
11 changed files with 2653 additions and 101 deletions

2
build
View File

@@ -1 +1 @@
78
80

View File

@@ -18,8 +18,8 @@ namespace OpenWifi {
};
static ActionLinkManager * instance() {
static ActionLinkManager instance;
return &instance;
static auto * instance_ = new ActionLinkManager;
return instance_;
}
int Start() final;

View File

@@ -45,8 +45,8 @@ namespace OpenWifi{
static int AccessTypeToInt(ACCESS_TYPE T);
static AuthService *instance() {
static AuthService instance;
return &instance;
static auto * instance_ = new AuthService;
return instance_;
}
int Start() override;

View File

@@ -24,8 +24,8 @@ namespace OpenWifi {
int Start() override;
void Stop() override;
static MFAServer *instance() {
static MFAServer instance;
return &instance;
static auto * instance_ = new MFAServer;
return instance_;
}
bool StartMFAChallenge(const SecurityObjects::UserInfoAndPolicy &UInfo, Poco::JSON::Object &Challenge);

View File

@@ -25,8 +25,8 @@ namespace OpenWifi {
class SMSSender : public SubSystemServer {
public:
static SMSSender *instance() {
static SMSSender instance;
return &instance;
static auto *instance_ = new SMSSender;
return instance_;
}
int Start() final;

View File

@@ -59,8 +59,8 @@ namespace OpenWifi {
class SMTPMailerService : public SubSystemServer, Poco::Runnable {
public:
static SMTPMailerService *instance() {
static SMTPMailerService instance;
return & instance;
static auto * instance_ = new SMTPMailerService;
return instance_;
}
struct MessageEvent {

View File

@@ -84,8 +84,8 @@ namespace OpenWifi {
}
static Storage *instance() {
static Storage instance;
return &instance;
static auto * instance_ = new Storage;
return instance_;
}
int Start() override;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,46 @@
//
// Created by stephane bourque on 2021-09-14.
//
#ifndef OWPROV_CONFIGURATIONVALIDATOR_H
#define OWPROV_CONFIGURATIONVALIDATOR_H
#include <nlohmann/json-schema.hpp>
#include "framework/MicroService.h"
using nlohmann::json;
using nlohmann::json_schema::json_validator;
namespace OpenWifi {
class ConfigurationValidator : public SubSystemServer {
public:
static ConfigurationValidator *instance() {
if(instance_== nullptr)
instance_ = new ConfigurationValidator;
return instance_;
}
bool Validate(const std::string &C, std::string &Error);
static void my_format_checker(const std::string &format, const std::string &value);
int Start() override;
void Stop() override;
void reinitialize(Poco::Util::Application &self) override;
private:
static ConfigurationValidator * instance_;
bool Initialized_=false;
bool Working_=false;
void Init();
std::unique_ptr<json_validator> Validator_=std::make_unique<json_validator>(nullptr, my_format_checker);
ConfigurationValidator():
SubSystemServer("configvalidator", "CFG-VALIDATOR", "config.validator") {
}
};
inline ConfigurationValidator * ConfigurationValidator() { return ConfigurationValidator::instance(); }
inline bool ValidateUCentralConfiguration(const std::string &C, std::string &Error) { return ConfigurationValidator::instance()->Validate(C, Error); }
}
#endif //OWPROV_CONFIGURATIONVALIDATOR_H

View File

@@ -90,8 +90,8 @@ namespace OpenWifi {
inline AppServiceRegistry();
static AppServiceRegistry & instance() {
static AppServiceRegistry instance;
return instance;
static AppServiceRegistry *instance_= new AppServiceRegistry;
return *instance_;
}
inline ~AppServiceRegistry() {
@@ -1435,8 +1435,8 @@ namespace OpenWifi {
};
static RESTAPI_RateLimiter *instance() {
static RESTAPI_RateLimiter instance;
return &instance;
static RESTAPI_RateLimiter * instance_ = new RESTAPI_RateLimiter;
return instance_;
}
inline int Start() final { return 0;};
@@ -2081,6 +2081,50 @@ namespace OpenWifi {
Poco::JSON::Object Body_;
};
class KafkaProducer : public Poco::Runnable {
public:
inline void run();
void Start() {
if(!Running_) {
Running_=true;
Worker_.start(*this);
}
}
void Stop() {
if(Running_) {
Running_=false;
Worker_.wakeUp();
Worker_.join();
}
}
private:
std::mutex Mutex_;
Poco::Thread Worker_;
std::atomic_bool Running_=false;
};
class KafkaConsumer : public Poco::Runnable {
public:
inline void run();
void Start() {
if(!Running_) {
Running_=true;
Worker_.start(*this);
}
}
void Stop() {
if(Running_) {
Running_=false;
Worker_.wakeUp();
Worker_.join();
}
}
private:
std::mutex Mutex_;
Poco::Thread Worker_;
std::atomic_bool Running_=false;
};
class KafkaManager : public SubSystemServer {
public:
struct KMessage {
@@ -2089,33 +2133,32 @@ namespace OpenWifi {
PayLoad;
};
friend class KafkaConsumer;
friend class KafkaProducer;
inline void initialize(Poco::Util::Application & self) override;
static KafkaManager *instance() {
static KafkaManager instance;
return &instance;
static KafkaManager * instance_ = new KafkaManager;
return instance_;
}
inline int Start() override {
if(!KafkaEnabled_)
return 0;
ProducerThr_ = std::make_unique<std::thread>([this]() { this->ProducerThr(); });
ConsumerThr_ = std::make_unique<std::thread>([this]() { this->ConsumerThr(); });
ConsumerThr_.Start();
ProducerThr_.Start();
return 0;
}
inline void Stop() override {
if(KafkaEnabled_) {
ProducerRunning_ = ConsumerRunning_ = false;
ProducerThr_->join();
ConsumerThr_->join();
ProducerThr_.Stop();
ConsumerThr_.Stop();
return;
}
}
inline void ProducerThr();
inline void ConsumerThr();
inline void PostMessage(const std::string &topic, const std::string & key, const std::string &PayLoad, bool WrapMessage = true ) {
if(KafkaEnabled_) {
std::lock_guard G(Mutex_);
@@ -2168,18 +2211,13 @@ namespace OpenWifi {
// void WakeUp();
private:
std::mutex ProducerMutex_;
std::mutex ConsumerMutex_;
bool KafkaEnabled_ = false;
std::atomic_bool ProducerRunning_ = false;
std::atomic_bool ConsumerRunning_ = false;
std::queue<KMessage> Queue_;
std::string SystemInfoWrapper_;
std::unique_ptr<std::thread> ConsumerThr_;
std::unique_ptr<std::thread> ProducerThr_;
int FunctionId_=1;
Types::NotifyTable Notifiers_;
std::unique_ptr<cppkafka::Configuration> Config_;
KafkaProducer ProducerThr_;
KafkaConsumer ConsumerThr_;
inline void PartitionAssignment(const cppkafka::TopicPartitionList& partitions) {
Logger_.information(Poco::format("Partition assigned: %Lu...",(uint64_t )partitions.front().get_partition()));
@@ -2204,8 +2242,8 @@ namespace OpenWifi {
}
static AuthClient *instance() {
static AuthClient instance;
return &instance;
static AuthClient * instance_ = new AuthClient;
return instance_;
}
inline int Start() override {
@@ -2328,14 +2366,14 @@ namespace OpenWifi {
}
static ALBHealthCheckServer *instance() {
static ALBHealthCheckServer instance;
return &instance;
static ALBHealthCheckServer * instance = new ALBHealthCheckServer;
return instance;
}
inline int Start() override;
inline void Stop() override {
if(Server_)
if(Running_)
Server_->stop();
}
@@ -2343,6 +2381,7 @@ namespace OpenWifi {
std::unique_ptr<Poco::Net::HTTPServer> Server_;
std::unique_ptr<Poco::Net::ServerSocket> Socket_;
int Port_ = 0;
std::atomic_bool Running_=false;
};
inline ALBHealthCheckServer * ALBHealthCheckServer() { return ALBHealthCheckServer::instance(); }
@@ -2357,17 +2396,18 @@ namespace OpenWifi {
class RESTAPI_server : public SubSystemServer {
public:
static RESTAPI_server *instance() {
static RESTAPI_server instance;
return &instance;
static RESTAPI_server *instance_ = new RESTAPI_server;
return instance_;
}
int Start() override;
inline void Stop() override {
Logger_.information("Stopping ");
for( const auto & svr : RESTServers_ )
svr->stop();
Pool_.joinAll();
RESTServers_.clear();
}
inline void reinitialize(Poco::Util::Application &self) override;
inline Poco::Net::HTTPRequestHandler *CallServer(const char *Path) {
@@ -2422,7 +2462,7 @@ namespace OpenWifi {
if(!Svr.RootCA().empty())
Svr.LogCas(Logger_);
auto Params = new Poco::Net::HTTPServerParams;
Poco::Net::HTTPServerParams::Ptr Params = new Poco::Net::HTTPServerParams;
Params->setMaxThreads(50);
Params->setMaxQueued(200);
Params->setKeepAlive(true);
@@ -2439,8 +2479,8 @@ namespace OpenWifi {
public:
static RESTAPI_InternalServer *instance() {
static RESTAPI_InternalServer instance;
return &instance;
static RESTAPI_InternalServer *instance_ = new RESTAPI_InternalServer;
return instance_;
}
inline int Start() override;
@@ -2448,7 +2488,7 @@ namespace OpenWifi {
Logger_.information("Stopping ");
for( const auto & svr : RESTServers_ )
svr->stop();
RESTServers_.clear();
Pool_.stopAll();
}
inline void reinitialize(Poco::Util::Application &self) override;
@@ -2465,7 +2505,6 @@ namespace OpenWifi {
RESTAPI_InternalServer() noexcept: SubSystemServer("RESTAPIInternalServer", "REST-ISRV", "openwifi.internal.restapi")
{
}
};
inline RESTAPI_InternalServer * RESTAPI_InternalServer() { return RESTAPI_InternalServer::instance(); };
@@ -2483,7 +2522,7 @@ namespace OpenWifi {
}
private:
Poco::Logger & Logger_;
RESTAPI_GenericServer &Server_;
RESTAPI_GenericServer & Server_;
};
inline int RESTAPI_InternalServer::Start() {
@@ -2621,7 +2660,7 @@ namespace OpenWifi {
std::string ConfigFileName_;
Poco::UUIDGenerator UUIDGenerator_;
uint64_t ID_ = 1;
Poco::SharedPtr<Poco::Crypto::RSAKey> AppKey_ = nullptr;
Poco::SharedPtr<Poco::Crypto::RSAKey> AppKey_;
bool DebugMode_ = false;
std::string DataDir_;
std::string WWWAssetsDir_;
@@ -2923,8 +2962,9 @@ namespace OpenWifi {
inline void MicroService::StopSubSystemServers() {
BusEventManager_.Stop();
for(auto i=SubSystems_.rbegin(); i!=SubSystems_.rend(); ++i)
(*i)->Stop();
for(auto i=SubSystems_.rbegin(); i!=SubSystems_.rend(); ++i) {
(*i)->Stop();
}
}
[[nodiscard]] inline std::string MicroService::CreateUUID() {
@@ -2942,7 +2982,6 @@ namespace OpenWifi {
}
return true;
} else {
// std::cout << "Sub:" << SubSystem << " Level:" << Level << std::endl;
for (auto i : SubSystems_) {
if (Sub == Poco::toLower(i->Name())) {
i->Logger().setLevel(P);
@@ -3092,7 +3131,6 @@ namespace OpenWifi {
StartSubSystemServers();
waitForTerminationRequest();
StopSubSystemServers();
logger.notice(Poco::format("Stopped %s...",DAEMON_APP_NAME));
}
@@ -3172,6 +3210,7 @@ namespace OpenWifi {
inline int ALBHealthCheckServer::Start() {
if(MicroService::instance().ConfigGetBool("alb.enable",false)) {
Running_=true;
Port_ = (int)MicroService::instance().ConfigGetInt("alb.port",15015);
Socket_ = std::make_unique<Poco::Net::ServerSocket>(Port_);
auto Params = new Poco::Net::HTTPServerParams;
@@ -3216,41 +3255,42 @@ namespace OpenWifi {
KafkaEnabled_ = MicroService::instance().ConfigGetBool("openwifi.kafka.enable",false);
}
inline void KafkaManager::ProducerThr() {
inline void KafkaProducer::run() {
cppkafka::Configuration Config({
{ "client.id", MicroService::instance().ConfigGetString("openwifi.kafka.client.id") },
{ "metadata.broker.list", MicroService::instance().ConfigGetString("openwifi.kafka.brokerlist") }
});
SystemInfoWrapper_ = R"lit({ "system" : { "id" : )lit" +
KafkaManager()->SystemInfoWrapper_ = R"lit({ "system" : { "id" : )lit" +
std::to_string(MicroService::instance().ID()) +
R"lit( , "host" : ")lit" + MicroService::instance().PrivateEndPoint() +
R"lit(" } , "payload" : )lit" ;
cppkafka::Producer Producer(Config);
ProducerRunning_ = true;
while(ProducerRunning_) {
Running_ = true;
while(Running_) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
try
{
std::lock_guard G(ProducerMutex_);
std::lock_guard G(Mutex_);
auto Num=0;
while (!Queue_.empty()) {
const auto M = Queue_.front();
while (!KafkaManager()->Queue_.empty()) {
const auto M = KafkaManager()->Queue_.front();
Producer.produce(
cppkafka::MessageBuilder(M.Topic).key(M.Key).payload(M.PayLoad));
Queue_.pop();
KafkaManager()->Queue_.pop();
Num++;
}
if(Num)
Producer.flush();
} catch (const cppkafka::HandleException &E ) {
Logger_.warning(Poco::format("Caught a Kafka exception (producer): %s",std::string{E.what()}));
KafkaManager()->Logger_.warning(Poco::format("Caught a Kafka exception (producer): %s",std::string{E.what()}));
} catch (const Poco::Exception &E) {
Logger_.log(E);
KafkaManager()->Logger_.log(E);
}
}
Producer.flush();
}
inline void KafkaManager::ConsumerThr() {
inline void KafkaConsumer::run() {
cppkafka::Configuration Config({
{ "client.id", MicroService::instance().ConfigGetString("openwifi.kafka.client.id") },
{ "metadata.broker.list", MicroService::instance().ConfigGetString("openwifi.kafka.brokerlist") },
@@ -3270,13 +3310,13 @@ namespace OpenWifi {
cppkafka::Consumer Consumer(Config);
Consumer.set_assignment_callback([this](cppkafka::TopicPartitionList& partitions) {
if(!partitions.empty()) {
Logger_.information(Poco::format("Partition assigned: %Lu...",
KafkaManager()->Logger_.information(Poco::format("Partition assigned: %Lu...",
(uint64_t)partitions.front().get_partition()));
}
});
Consumer.set_revocation_callback([this](const cppkafka::TopicPartitionList& partitions) {
if(!partitions.empty()) {
Logger_.information(Poco::format("Partition revocation: %Lu...",
KafkaManager()->Logger_.information(Poco::format("Partition revocation: %Lu...",
(uint64_t)partitions.front().get_partition()));
}
});
@@ -3285,13 +3325,13 @@ namespace OpenWifi {
auto BatchSize = MicroService::instance().ConfigGetInt("openwifi.kafka.consumer.batchsize",20);
Types::StringVec Topics;
for(const auto &i:Notifiers_)
for(const auto &i:KafkaManager()->Notifiers_)
Topics.push_back(i.first);
Consumer.subscribe(Topics);
ConsumerRunning_ = true;
while(ConsumerRunning_) {
Running_ = true;
while(Running_) {
try {
std::vector<cppkafka::Message> MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(200));
for(auto const &Msg:MsgVec) {
@@ -3299,14 +3339,14 @@ namespace OpenWifi {
continue;
if (Msg.get_error()) {
if (!Msg.is_eof()) {
Logger_.error(Poco::format("Error: %s", Msg.get_error().to_string()));
KafkaManager()->Logger_.error(Poco::format("Error: %s", Msg.get_error().to_string()));
}if(!AutoCommit)
Consumer.async_commit(Msg);
continue;
}
std::lock_guard G(ConsumerMutex_);
auto It = Notifiers_.find(Msg.get_topic());
if (It != Notifiers_.end()) {
std::lock_guard G(Mutex_);
auto It = KafkaManager()->Notifiers_.find(Msg.get_topic());
if (It != KafkaManager()->Notifiers_.end()) {
Types::TopicNotifyFunctionList &FL = It->second;
std::string Key{Msg.get_key()};
std::string Payload{Msg.get_payload()};
@@ -3319,11 +3359,12 @@ namespace OpenWifi {
Consumer.async_commit(Msg);
}
} catch (const cppkafka::HandleException &E) {
Logger_.warning(Poco::format("Caught a Kafka exception (consumer): %s",std::string{E.what()}));
KafkaManager()->Logger_.warning(Poco::format("Caught a Kafka exception (consumer): %s",std::string{E.what()}));
} catch (const Poco::Exception &E) {
Logger_.log(E);
KafkaManager()->Logger_.log(E);
}
}
Consumer.unsubscribe();
}
inline void RESTAPI_server::reinitialize(Poco::Util::Application &self) {
@@ -3544,11 +3585,9 @@ namespace OpenWifi {
if(Response.getStatus()==Poco::Net::HTTPResponse::HTTP_OK) {
Poco::JSON::Parser P;
ResponseObject = P.parse(is).extract<Poco::JSON::Object::Ptr>();
// std::cout << "Response OK" << std::endl;
} else {
Poco::JSON::Parser P;
ResponseObject = P.parse(is).extract<Poco::JSON::Object::Ptr>();
// std::cout << "Response: " << Response.getStatus() << std::endl;
}
return Response.getStatus();
}
@@ -3594,11 +3633,9 @@ namespace OpenWifi {
if(Response.getStatus()==Poco::Net::HTTPResponse::HTTP_OK) {
Poco::JSON::Parser P;
ResponseObject = P.parse(is).extract<Poco::JSON::Object::Ptr>();
// std::cout << "Response OK" << std::endl;
} else {
Poco::JSON::Parser P;
ResponseObject = P.parse(is).extract<Poco::JSON::Object::Ptr>();
// std::cout << "Response: " << Response.getStatus() << std::endl;
}
return Response.getStatus();
}

View File

@@ -26,13 +26,6 @@ namespace OpenWifi {
class StorageClass : public SubSystemServer {
public:
/* static StorageClass *instance() {
if (instance_ == nullptr) {
instance_ = new StorageClass;
}
return instance_;
}
*/
StorageClass() noexcept:
SubSystemServer("StorageClass", "STORAGE-SVR", "storage")
{
@@ -56,7 +49,7 @@ namespace OpenWifi {
}
void Stop() override {
Pool_->shutdown();
}
[[nodiscard]] inline std::string ComputeRange(uint64_t From, uint64_t HowMany) {
@@ -96,11 +89,11 @@ namespace OpenWifi {
inline int Setup_PostgreSQL();
protected:
std::unique_ptr<Poco::Data::SessionPool> Pool_;
std::unique_ptr<Poco::Data::SQLite::Connector> SQLiteConn_;
std::unique_ptr<Poco::Data::PostgreSQL::Connector> PostgresConn_;
std::unique_ptr<Poco::Data::MySQL::Connector> MySQLConn_;
DBType dbType_ = sqlite;
Poco::SharedPtr<Poco::Data::SessionPool> Pool_;
Poco::Data::SQLite::Connector SQLiteConn_;
Poco::Data::PostgreSQL::Connector PostgresConn_;
Poco::Data::MySQL::Connector MySQLConn_;
DBType dbType_ = sqlite;
};
#ifdef SMALL_BUILD
@@ -114,9 +107,8 @@ namespace OpenWifi {
auto DBName = MicroService::instance().DataDir() + "/" + MicroService::instance().ConfigGetString("storage.type.sqlite.db");
auto NumSessions = MicroService::instance().ConfigGetInt("storage.type.sqlite.maxsessions", 64);
auto IdleTime = MicroService::instance().ConfigGetInt("storage.type.sqlite.idletime", 60);
SQLiteConn_ = std::make_unique<Poco::Data::SQLite::Connector>();
SQLiteConn_->registerConnector();
Pool_ = std::make_unique<Poco::Data::SessionPool>(SQLiteConn_->name(), DBName, 4, NumSessions, IdleTime);
SQLiteConn_.registerConnector();
Pool_ = Poco::SharedPtr<Poco::Data::SessionPool>(new Poco::Data::SessionPool(SQLiteConn_.name(), DBName, 4, NumSessions, IdleTime));
return 0;
}
@@ -139,9 +131,8 @@ namespace OpenWifi {
";port=" + Port +
";compress=true;auto-reconnect=true";
MySQLConn_ = std::make_unique<Poco::Data::MySQL::Connector>();
MySQLConn_->registerConnector();
Pool_ = std::make_unique<Poco::Data::SessionPool>(MySQLConn_->name(), ConnectionStr, 4, NumSessions, IdleTime);
MySQLConn_.registerConnector();
Pool_ = Poco::SharedPtr<Poco::Data::SessionPool>(new Poco::Data::SessionPool(MySQLConn_.name(), ConnectionStr, 4, NumSessions, IdleTime));
return 0;
}
@@ -166,9 +157,8 @@ namespace OpenWifi {
" port=" + Port +
" connect_timeout=" + ConnectionTimeout;
PostgresConn_ = std::make_unique<Poco::Data::PostgreSQL::Connector>();
PostgresConn_->registerConnector();
Pool_ = std::make_unique<Poco::Data::SessionPool>(PostgresConn_->name(), ConnectionStr, 4, NumSessions, IdleTime);
PostgresConn_.registerConnector();
Pool_ = Poco::SharedPtr<Poco::Data::SessionPool>(new Poco::Data::SessionPool(PostgresConn_.name(), ConnectionStr, 4, NumSessions, IdleTime));
return 0;
}