From a6a9daa8a1deb8bb1fddabe4e519be1f15467658 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 30 Oct 2022 09:07:27 -0700 Subject: [PATCH] https://telecominfraproject.atlassian.net/browse/WIFI-11303 Signed-off-by: stephb9959 --- src/AutoDiscovery.cpp | 3 ++ src/FileDownloader.cpp | 4 ++- src/JobController.cpp | 5 +-- src/Signup.cpp | 3 ++ src/StorageService.cpp | 4 ++- src/framework/ALBserver.cpp | 1 + src/framework/EventBusManager.cpp | 1 + src/framework/KafkaManager.cpp | 37 ++++++++++++++-------- src/framework/KafkaManager.h | 29 +++++++++++------ src/framework/UI_WebSocketClientServer.cpp | 3 ++ 10 files changed, 64 insertions(+), 26 deletions(-) diff --git a/src/AutoDiscovery.cpp b/src/AutoDiscovery.cpp index e564053..aa94320 100644 --- a/src/AutoDiscovery.cpp +++ b/src/AutoDiscovery.cpp @@ -12,6 +12,7 @@ namespace OpenWifi { int AutoDiscovery::Start() { + poco_information(Logger(),"Starting..."); Running_ = true; Types::TopicNotifyFunction F = [this](const std::string &Key, const std::string &Payload) { this->ConnectionReceived(Key,Payload); }; ConnectionWatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::CONNECTION, F); @@ -20,10 +21,12 @@ namespace OpenWifi { }; void AutoDiscovery::Stop() { + poco_information(Logger(),"Stopping..."); Running_ = false; KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, ConnectionWatcherId_); Queue_.wakeUpAll(); Worker_.join(); + poco_information(Logger(),"Stopped..."); }; void AutoDiscovery::run() { diff --git a/src/FileDownloader.cpp b/src/FileDownloader.cpp index faf1654..7ef49da 100644 --- a/src/FileDownloader.cpp +++ b/src/FileDownloader.cpp @@ -7,6 +7,7 @@ namespace OpenWifi { int FileDownloader::Start() { + poco_information(Logger(),"Starting..."); TimerCallback_ = std::make_unique>(*this,&FileDownloader::onTimer); Timer_.setStartInterval( 20 * 1000); // first run in 20 seconds Timer_.setPeriodicInterval(2 * 60 * 60 * 1000); // 1 hours @@ -15,8 +16,9 @@ namespace OpenWifi { } void FileDownloader::Stop() { + poco_information(Logger(),"Stopping..."); Timer_.stop(); - Logger().notice("Stopping."); + poco_information(Logger(),"Stopped..."); } void FileDownloader::onTimer([[maybe_unused]] Poco::Timer &timer) { diff --git a/src/JobController.cpp b/src/JobController.cpp index b00b9c3..a5d0485 100644 --- a/src/JobController.cpp +++ b/src/JobController.cpp @@ -11,9 +11,8 @@ namespace OpenWifi { void RegisterJobTypes(); int JobController::Start() { - + poco_information(Logger(),"Starting..."); RegisterJobTypes(); - if(!Running_) Thr_.start(*this); @@ -22,8 +21,10 @@ namespace OpenWifi { void JobController::Stop() { if(Running_) { + poco_information(Logger(),"Stopping..."); Running_ = false; Thr_.join(); + poco_information(Logger(),"Stopped..."); } } diff --git a/src/Signup.cpp b/src/Signup.cpp index 482e95e..60f558f 100644 --- a/src/Signup.cpp +++ b/src/Signup.cpp @@ -11,6 +11,7 @@ namespace OpenWifi { int Signup::Start() { + poco_information(Logger(),"Starting..."); GracePeriod_ = MicroServiceConfigGetInt("signup.graceperiod", 60*60); LingerPeriod_ = MicroServiceConfigGetInt("signup.lingerperiod", 24*60*60); @@ -31,10 +32,12 @@ namespace OpenWifi { } void Signup::Stop() { + poco_information(Logger(),"Stopping..."); Running_ = false; Timer_.stop(); Worker_.wakeUp(); Worker_.join(); + poco_information(Logger(),"Stopped..."); } void Signup::onTimer([[maybe_unused]] Poco::Timer &timer) { diff --git a/src/StorageService.cpp b/src/StorageService.cpp index e1a3c7c..d5e0198 100644 --- a/src/StorageService.cpp +++ b/src/StorageService.cpp @@ -14,6 +14,7 @@ namespace OpenWifi { int Storage::Start() { + poco_information(Logger(),"Starting..."); std::lock_guard Guard(Mutex_); StorageClass::Start(); @@ -111,8 +112,9 @@ namespace OpenWifi { } void Storage::Stop() { + poco_information(Logger(),"Stopping..."); Timer_.stop(); - Logger().notice("Stopping."); + poco_information(Logger(),"Stopped..."); } bool Storage::Validate(const Poco::URI::QueryParameters &P, RESTAPI::Errors::msg &Error) { diff --git a/src/framework/ALBserver.cpp b/src/framework/ALBserver.cpp index 773fad8..4f3e1b1 100644 --- a/src/framework/ALBserver.cpp +++ b/src/framework/ALBserver.cpp @@ -48,6 +48,7 @@ namespace OpenWifi { int ALBHealthCheckServer::Start() { if(MicroServiceConfigGetBool("alb.enable",false)) { + poco_information(Logger(),"Starting..."); Running_=true; Port_ = (int)MicroServiceConfigGetInt("alb.port",15015); Socket_ = std::make_unique(Port_); diff --git a/src/framework/EventBusManager.cpp b/src/framework/EventBusManager.cpp index a32b4f6..967ca58 100644 --- a/src/framework/EventBusManager.cpp +++ b/src/framework/EventBusManager.cpp @@ -30,6 +30,7 @@ namespace OpenWifi { }; void EventBusManager::Start() { + poco_information(Logger(),"Starting..."); if(KafkaManager()->Enabled()) { Thread_.start(*this); } diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index a6c6b88..fa556bb 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -67,8 +67,6 @@ namespace OpenWifi { KafkaEnabled_ = MicroServiceConfigGetBool("openwifi.kafka.enable",false); } - - inline void KafkaProducer::run() { Utils::SetThreadName("Kafka:Prod"); @@ -185,6 +183,7 @@ namespace OpenWifi { } void KafkaProducer::Start() { + poco_information(Logger_,"Starting..."); if(!Running_) { Running_=true; Worker_.start(*this); @@ -193,9 +192,11 @@ namespace OpenWifi { void KafkaProducer::Stop() { if(Running_) { + poco_information(Logger_,"Stopping..."); Running_=false; Queue_.wakeUpAll(); Worker_.join(); + poco_information(Logger_,"Stopped..."); } } @@ -206,6 +207,7 @@ namespace OpenWifi { void KafkaConsumer::Start() { if(!Running_) { + poco_information(Logger_,"Starting..."); Running_=true; Worker_.start(*this); } @@ -213,14 +215,17 @@ namespace OpenWifi { void KafkaConsumer::Stop() { if(Running_) { + poco_information(Logger_,"Stopping..."); Running_=false; Worker_.wakeUp(); Worker_.join(); + poco_information(Logger_,"Stopped..."); } } void KafkaDispatcher::Start() { if(!Running_) { + poco_information(Logger_,"Starting..."); Running_=true; Worker_.start(*this); } @@ -228,9 +233,11 @@ namespace OpenWifi { void KafkaDispatcher::Stop() { if(Running_) { + poco_information(Logger_,"Stopping..."); Running_=false; Queue_.wakeUpAll(); Worker_.join(); + poco_information(Logger_,"Stopped..."); } } @@ -296,18 +303,22 @@ namespace OpenWifi { int KafkaManager::Start() { if(!KafkaEnabled_) return 0; - ConsumerThr_.Start(); - ProducerThr_.Start(); - Dispatcher_.Start(); + poco_information(Logger(),"Starting..."); + ConsumerThr_ = std::make_unique(Logger().create("KAFKA-CONSUMER",Logger().getChannel())); + ConsumerThr_->Start(); + ProducerThr_ = std::make_unique(Logger().create("KAFKA-PRODUCER",Logger().getChannel())); + ProducerThr_->Start(); + Dispatcher_ = std::make_unique(Logger().create("KAFKA-DISPATCHER",Logger().getChannel())); + Dispatcher_->Start(); return 0; } void KafkaManager::Stop() { if(KafkaEnabled_) { poco_information(Logger(),"Stopping..."); - Dispatcher_.Stop(); - ProducerThr_.Stop(); - ConsumerThr_.Stop(); + Dispatcher_->Stop(); + ProducerThr_->Stop(); + ConsumerThr_->Stop(); poco_information(Logger(),"Stopped..."); return; } @@ -315,12 +326,12 @@ namespace OpenWifi { void KafkaManager::PostMessage(const std::string &topic, const std::string & key, const std::string &PayLoad, bool WrapMessage ) { if(KafkaEnabled_) { - ProducerThr_.Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad); + ProducerThr_->Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad); } } void KafkaManager::Dispatch(const std::string &Topic, const std::string & Key, const std::string &Payload) { - Dispatcher_.Dispatch(Topic, Key, Payload); + Dispatcher_->Dispatch(Topic, Key, Payload); } [[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) { @@ -329,7 +340,7 @@ namespace OpenWifi { uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) { if(KafkaEnabled_) { - return Dispatcher_.RegisterTopicWatcher(Topic,F); + return Dispatcher_->RegisterTopicWatcher(Topic,F); } else { return 0; } @@ -337,12 +348,12 @@ namespace OpenWifi { void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) { if(KafkaEnabled_) { - Dispatcher_.UnregisterTopicWatcher(Topic, Id); + Dispatcher_->UnregisterTopicWatcher(Topic, Id); } } void KafkaManager::Topics(std::vector &T) { - Dispatcher_.Topics(T); + Dispatcher_->Topics(T); } void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList& partitions) { diff --git a/src/framework/KafkaManager.h b/src/framework/KafkaManager.h index ebe75e2..126de6e 100644 --- a/src/framework/KafkaManager.h +++ b/src/framework/KafkaManager.h @@ -34,13 +34,16 @@ namespace OpenWifi { class KafkaProducer : public Poco::Runnable { public: - + KafkaProducer(Poco::Logger &L) : + Logger_(L) { + } void run () override; void Start(); void Stop(); void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload); private: + Poco::Logger &Logger_; std::recursive_mutex Mutex_; Poco::Thread Worker_; mutable std::atomic_bool Running_=false; @@ -49,18 +52,25 @@ namespace OpenWifi { class KafkaConsumer : public Poco::Runnable { public: + KafkaConsumer(Poco::Logger &L) : + Logger_(L) { + } void run() override; void Start(); void Stop(); private: - std::recursive_mutex Mutex_; - Poco::Thread Worker_; - mutable std::atomic_bool Running_=false; + Poco::Logger &Logger_; + std::recursive_mutex Mutex_; + Poco::Thread Worker_; + mutable std::atomic_bool Running_=false; }; class KafkaDispatcher : public Poco::Runnable { public: + KafkaDispatcher(Poco::Logger &L) : + Logger_(L) { + } void Start(); void Stop(); @@ -71,6 +81,7 @@ namespace OpenWifi { void Topics(std::vector &T); private: + Poco::Logger &Logger_; std::recursive_mutex Mutex_; Types::NotifyTable Notifiers_; Poco::Thread Worker_; @@ -104,11 +115,11 @@ namespace OpenWifi { void Topics(std::vector &T); private: - bool KafkaEnabled_ = false; - std::string SystemInfoWrapper_; - KafkaProducer ProducerThr_; - KafkaConsumer ConsumerThr_; - KafkaDispatcher Dispatcher_; + bool KafkaEnabled_ = false; + std::string SystemInfoWrapper_; + std::unique_ptr ProducerThr_; + std::unique_ptr ConsumerThr_; + std::unique_ptr Dispatcher_; void PartitionAssignment(const cppkafka::TopicPartitionList& partitions); void PartitionRevocation(const cppkafka::TopicPartitionList& partitions); diff --git a/src/framework/UI_WebSocketClientServer.cpp b/src/framework/UI_WebSocketClientServer.cpp index e537349..02c6d4a 100644 --- a/src/framework/UI_WebSocketClientServer.cpp +++ b/src/framework/UI_WebSocketClientServer.cpp @@ -80,6 +80,7 @@ namespace OpenWifi { }; int UI_WebSocketClientServer::Start() { + poco_information(Logger(),"Starting..."); GoogleApiKey_ = MicroServiceConfigGetString("google.apikey",""); GeoCodeEnabled_ = !GoogleApiKey_.empty(); ReactorThread_.start(Reactor_); @@ -89,12 +90,14 @@ namespace OpenWifi { void UI_WebSocketClientServer::Stop() { if(Running_) { + poco_information(Logger(),"Stopping..."); Clients_.clear(); Reactor_.stop(); ReactorThread_.join(); Running_ = false; Thr_.wakeUp(); Thr_.join(); + poco_information(Logger(),"Stopped..."); } };