stephb9959
2022-10-30 09:07:27 -07:00
parent 5b546ea381
commit a6a9daa8a1
10 changed files with 64 additions and 26 deletions

View File

@@ -12,6 +12,7 @@
namespace OpenWifi {
int AutoDiscovery::Start() {
poco_information(Logger(),"Starting...");
Running_ = true;
Types::TopicNotifyFunction F = [this](const std::string &Key, const std::string &Payload) { this->ConnectionReceived(Key,Payload); };
ConnectionWatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::CONNECTION, F);
@@ -20,10 +21,12 @@ namespace OpenWifi {
};
void AutoDiscovery::Stop() {
poco_information(Logger(),"Stopping...");
Running_ = false;
KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, ConnectionWatcherId_);
Queue_.wakeUpAll();
Worker_.join();
poco_information(Logger(),"Stopped...");
};
void AutoDiscovery::run() {

View File

@@ -7,6 +7,7 @@
namespace OpenWifi {
int FileDownloader::Start() {
poco_information(Logger(),"Starting...");
TimerCallback_ = std::make_unique<Poco::TimerCallback<FileDownloader>>(*this,&FileDownloader::onTimer);
Timer_.setStartInterval( 20 * 1000); // first run in 20 seconds
Timer_.setPeriodicInterval(2 * 60 * 60 * 1000); // 1 hours
@@ -15,8 +16,9 @@ namespace OpenWifi {
}
void FileDownloader::Stop() {
poco_information(Logger(),"Stopping...");
Timer_.stop();
Logger().notice("Stopping.");
poco_information(Logger(),"Stopped...");
}
void FileDownloader::onTimer([[maybe_unused]] Poco::Timer &timer) {

View File

@@ -11,9 +11,8 @@ namespace OpenWifi {
void RegisterJobTypes();
int JobController::Start() {
poco_information(Logger(),"Starting...");
RegisterJobTypes();
if(!Running_)
Thr_.start(*this);
@@ -22,8 +21,10 @@ namespace OpenWifi {
void JobController::Stop() {
if(Running_) {
poco_information(Logger(),"Stopping...");
Running_ = false;
Thr_.join();
poco_information(Logger(),"Stopped...");
}
}

View File

@@ -11,6 +11,7 @@
namespace OpenWifi {
int Signup::Start() {
poco_information(Logger(),"Starting...");
GracePeriod_ = MicroServiceConfigGetInt("signup.graceperiod", 60*60);
LingerPeriod_ = MicroServiceConfigGetInt("signup.lingerperiod", 24*60*60);
@@ -31,10 +32,12 @@ namespace OpenWifi {
}
void Signup::Stop() {
poco_information(Logger(),"Stopping...");
Running_ = false;
Timer_.stop();
Worker_.wakeUp();
Worker_.join();
poco_information(Logger(),"Stopped...");
}
void Signup::onTimer([[maybe_unused]] Poco::Timer &timer) {

View File

@@ -14,6 +14,7 @@
namespace OpenWifi {
int Storage::Start() {
poco_information(Logger(),"Starting...");
std::lock_guard Guard(Mutex_);
StorageClass::Start();
@@ -111,8 +112,9 @@ namespace OpenWifi {
}
void Storage::Stop() {
poco_information(Logger(),"Stopping...");
Timer_.stop();
Logger().notice("Stopping.");
poco_information(Logger(),"Stopped...");
}
bool Storage::Validate(const Poco::URI::QueryParameters &P, RESTAPI::Errors::msg &Error) {

View File

@@ -48,6 +48,7 @@ namespace OpenWifi {
int ALBHealthCheckServer::Start() {
if(MicroServiceConfigGetBool("alb.enable",false)) {
poco_information(Logger(),"Starting...");
Running_=true;
Port_ = (int)MicroServiceConfigGetInt("alb.port",15015);
Socket_ = std::make_unique<Poco::Net::ServerSocket>(Port_);

View File

@@ -30,6 +30,7 @@ namespace OpenWifi {
};
void EventBusManager::Start() {
poco_information(Logger(),"Starting...");
if(KafkaManager()->Enabled()) {
Thread_.start(*this);
}

View File

@@ -67,8 +67,6 @@ namespace OpenWifi {
KafkaEnabled_ = MicroServiceConfigGetBool("openwifi.kafka.enable",false);
}
inline void KafkaProducer::run() {
Utils::SetThreadName("Kafka:Prod");
@@ -185,6 +183,7 @@ namespace OpenWifi {
}
void KafkaProducer::Start() {
poco_information(Logger_,"Starting...");
if(!Running_) {
Running_=true;
Worker_.start(*this);
@@ -193,9 +192,11 @@ namespace OpenWifi {
void KafkaProducer::Stop() {
if(Running_) {
poco_information(Logger_,"Stopping...");
Running_=false;
Queue_.wakeUpAll();
Worker_.join();
poco_information(Logger_,"Stopped...");
}
}
@@ -206,6 +207,7 @@ namespace OpenWifi {
void KafkaConsumer::Start() {
if(!Running_) {
poco_information(Logger_,"Starting...");
Running_=true;
Worker_.start(*this);
}
@@ -213,14 +215,17 @@ namespace OpenWifi {
void KafkaConsumer::Stop() {
if(Running_) {
poco_information(Logger_,"Stopping...");
Running_=false;
Worker_.wakeUp();
Worker_.join();
poco_information(Logger_,"Stopped...");
}
}
void KafkaDispatcher::Start() {
if(!Running_) {
poco_information(Logger_,"Starting...");
Running_=true;
Worker_.start(*this);
}
@@ -228,9 +233,11 @@ namespace OpenWifi {
void KafkaDispatcher::Stop() {
if(Running_) {
poco_information(Logger_,"Stopping...");
Running_=false;
Queue_.wakeUpAll();
Worker_.join();
poco_information(Logger_,"Stopped...");
}
}
@@ -296,18 +303,22 @@ namespace OpenWifi {
int KafkaManager::Start() {
if(!KafkaEnabled_)
return 0;
ConsumerThr_.Start();
ProducerThr_.Start();
Dispatcher_.Start();
poco_information(Logger(),"Starting...");
ConsumerThr_ = std::make_unique<KafkaConsumer>(Logger().create("KAFKA-CONSUMER",Logger().getChannel()));
ConsumerThr_->Start();
ProducerThr_ = std::make_unique<KafkaProducer>(Logger().create("KAFKA-PRODUCER",Logger().getChannel()));
ProducerThr_->Start();
Dispatcher_ = std::make_unique<KafkaDispatcher>(Logger().create("KAFKA-DISPATCHER",Logger().getChannel()));
Dispatcher_->Start();
return 0;
}
void KafkaManager::Stop() {
if(KafkaEnabled_) {
poco_information(Logger(),"Stopping...");
Dispatcher_.Stop();
ProducerThr_.Stop();
ConsumerThr_.Stop();
Dispatcher_->Stop();
ProducerThr_->Stop();
ConsumerThr_->Stop();
poco_information(Logger(),"Stopped...");
return;
}
@@ -315,12 +326,12 @@ namespace OpenWifi {
void KafkaManager::PostMessage(const std::string &topic, const std::string & key, const std::string &PayLoad, bool WrapMessage ) {
if(KafkaEnabled_) {
ProducerThr_.Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
ProducerThr_->Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
}
}
void KafkaManager::Dispatch(const std::string &Topic, const std::string & Key, const std::string &Payload) {
Dispatcher_.Dispatch(Topic, Key, Payload);
Dispatcher_->Dispatch(Topic, Key, Payload);
}
[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) {
@@ -329,7 +340,7 @@ namespace OpenWifi {
uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) {
if(KafkaEnabled_) {
return Dispatcher_.RegisterTopicWatcher(Topic,F);
return Dispatcher_->RegisterTopicWatcher(Topic,F);
} else {
return 0;
}
@@ -337,12 +348,12 @@ namespace OpenWifi {
void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) {
if(KafkaEnabled_) {
Dispatcher_.UnregisterTopicWatcher(Topic, Id);
Dispatcher_->UnregisterTopicWatcher(Topic, Id);
}
}
void KafkaManager::Topics(std::vector<std::string> &T) {
Dispatcher_.Topics(T);
Dispatcher_->Topics(T);
}
void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList& partitions) {

View File

@@ -34,13 +34,16 @@ namespace OpenWifi {
class KafkaProducer : public Poco::Runnable {
public:
KafkaProducer(Poco::Logger &L) :
Logger_(L) {
}
void run () override;
void Start();
void Stop();
void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload);
private:
Poco::Logger &Logger_;
std::recursive_mutex Mutex_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_=false;
@@ -49,18 +52,25 @@ namespace OpenWifi {
class KafkaConsumer : public Poco::Runnable {
public:
KafkaConsumer(Poco::Logger &L) :
Logger_(L) {
}
void run() override;
void Start();
void Stop();
private:
std::recursive_mutex Mutex_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_=false;
Poco::Logger &Logger_;
std::recursive_mutex Mutex_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_=false;
};
class KafkaDispatcher : public Poco::Runnable {
public:
KafkaDispatcher(Poco::Logger &L) :
Logger_(L) {
}
void Start();
void Stop();
@@ -71,6 +81,7 @@ namespace OpenWifi {
void Topics(std::vector<std::string> &T);
private:
Poco::Logger &Logger_;
std::recursive_mutex Mutex_;
Types::NotifyTable Notifiers_;
Poco::Thread Worker_;
@@ -104,11 +115,11 @@ namespace OpenWifi {
void Topics(std::vector<std::string> &T);
private:
bool KafkaEnabled_ = false;
std::string SystemInfoWrapper_;
KafkaProducer ProducerThr_;
KafkaConsumer ConsumerThr_;
KafkaDispatcher Dispatcher_;
bool KafkaEnabled_ = false;
std::string SystemInfoWrapper_;
std::unique_ptr<KafkaProducer> ProducerThr_;
std::unique_ptr<KafkaConsumer> ConsumerThr_;
std::unique_ptr<KafkaDispatcher> Dispatcher_;
void PartitionAssignment(const cppkafka::TopicPartitionList& partitions);
void PartitionRevocation(const cppkafka::TopicPartitionList& partitions);

View File

@@ -80,6 +80,7 @@ namespace OpenWifi {
};
int UI_WebSocketClientServer::Start() {
poco_information(Logger(),"Starting...");
GoogleApiKey_ = MicroServiceConfigGetString("google.apikey","");
GeoCodeEnabled_ = !GoogleApiKey_.empty();
ReactorThread_.start(Reactor_);
@@ -89,12 +90,14 @@ namespace OpenWifi {
void UI_WebSocketClientServer::Stop() {
if(Running_) {
poco_information(Logger(),"Stopping...");
Clients_.clear();
Reactor_.stop();
ReactorThread_.join();
Running_ = false;
Thr_.wakeUp();
Thr_.join();
poco_information(Logger(),"Stopped...");
}
};