From bc4da0aaeb5563a11a6016d4df261c5c73594f84 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sat, 2 Dec 2023 13:29:23 -0800 Subject: [PATCH 1/7] https://telecominfraproject.atlassian.net/browse/WIFI-13172 Signed-off-by: stephb9959 --- build | 2 +- src/framework/MicroService.cpp | 3 +- src/framework/StorageClass.h | 2 + src/framework/utils.h | 85 ++++++++++++++++++++++++++++++++++ 4 files changed, 90 insertions(+), 2 deletions(-) diff --git a/build b/build index d8263ee..e440e5c 100644 --- a/build +++ b/build @@ -1 +1 @@ -2 \ No newline at end of file +3 \ No newline at end of file diff --git a/src/framework/MicroService.cpp b/src/framework/MicroService.cpp index 5d517d9..26fb521 100644 --- a/src/framework/MicroService.cpp +++ b/src/framework/MicroService.cpp @@ -136,7 +136,8 @@ namespace OpenWifi { auto i = Services_.begin(); auto now = Utils::Now(); for (; i != Services_.end();) { - if ((now - i->second.LastUpdate) > 60) { + if ((now - i->second.LastUpdate) > 120) { + poco_warning(logger(), fmt::format("ZombieService: Removing service {}, ", i->second.PublicEndPoint)); i = Services_.erase(i); } else ++i; diff --git a/src/framework/StorageClass.h b/src/framework/StorageClass.h index cfb5530..36751bc 100644 --- a/src/framework/StorageClass.h +++ b/src/framework/StorageClass.h @@ -47,6 +47,8 @@ namespace OpenWifi { } + Poco::Data::SessionPool &Pool() { return *Pool_; } + private: inline int Setup_SQLite(); inline int Setup_MySQL(); diff --git a/src/framework/utils.h b/src/framework/utils.h index fa0d58d..b20d32a 100644 --- a/src/framework/utils.h +++ b/src/framework/utils.h @@ -316,5 +316,90 @@ namespace OpenWifi::Utils { uint32_t Port; }; + class CompressedString { + public: + CompressedString() { + DecompressedSize_ = 0; + }; + + explicit CompressedString(const std::string &Data) : DecompressedSize_(Data.size()) { + CompressIt(Data); + } + + CompressedString(const CompressedString &Data) { + this->DecompressedSize_ = Data.DecompressedSize_; + this->CompressedData_ = Data.CompressedData_; + } + + CompressedString& operator=(const CompressedString& rhs) { + if (this != &rhs) { + this->DecompressedSize_ = rhs.DecompressedSize_; + this->CompressedData_ = rhs.CompressedData_; + } + return *this; + } + + CompressedString& operator=(CompressedString&& rhs) { + if (this != &rhs) { + this->DecompressedSize_ = rhs.DecompressedSize_; + this->CompressedData_ = rhs.CompressedData_; + } + return *this; + } + + ~CompressedString() = default; + + operator std::string() const { + return DecompressIt(); + } + + CompressedString &operator=(const std::string &Data) { + DecompressedSize_ = Data.size(); + CompressIt(Data); + return *this; + } + + auto CompressedSize() const { return CompressedData_.size(); } + auto DecompressedSize() const { return DecompressedSize_; } + + private: + std::string CompressedData_; + std::size_t DecompressedSize_; + + inline void CompressIt(const std::string &Data) { + z_stream strm; // = {0}; + CompressedData_.resize(Data.size()); + strm.next_in = (Bytef *)Data.data(); + strm.avail_in = Data.size(); + strm.next_out = (Bytef *)CompressedData_.data(); + strm.avail_out = Data.size(); + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY); + deflate(&strm, Z_FINISH); + deflateEnd(&strm); + CompressedData_.resize(strm.total_out); + } + + [[nodiscard]] std::string DecompressIt() const { + std::string Result; + if(DecompressedSize_!=0) { + Result.resize(DecompressedSize_); + z_stream strm ; //= {0}; + strm.next_in = (Bytef *)CompressedData_.data(); + strm.avail_in = CompressedData_.size(); + strm.next_out = (Bytef *)Result.data(); + strm.avail_out = Result.size(); + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + inflateInit2(&strm, 15 + 32); + inflate(&strm, Z_FINISH); + inflateEnd(&strm); + } + return Result; + } + }; } // namespace OpenWifi::Utils From 7d5c130d5c3cce1b71f6b043a4b8447ef720eb9d Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 3 Dec 2023 12:05:20 -0800 Subject: [PATCH 2/7] https://telecominfraproject.atlassian.net/browse/WIFI-13172 Signed-off-by: stephb9959 --- build | 2 +- src/framework/MicroService.cpp | 98 ++++++++++++++++++++-------------- 2 files changed, 59 insertions(+), 41 deletions(-) diff --git a/build b/build index e440e5c..bf0d87a 100644 --- a/build +++ b/build @@ -1 +1 @@ -3 \ No newline at end of file +4 \ No newline at end of file diff --git a/src/framework/MicroService.cpp b/src/framework/MicroService.cpp index 26fb521..30460fe 100644 --- a/src/framework/MicroService.cpp +++ b/src/framework/MicroService.cpp @@ -33,6 +33,17 @@ namespace OpenWifi { void MicroService::Exit(int Reason) { std::exit(Reason); } + static std::string MakeServiceListString(const Types::MicroServiceMetaMap &Services) { + std::string SvcList; + for (const auto &Svc : Services) { + if (SvcList.empty()) + SvcList = Svc.second.Type; + else + SvcList += ", " + Svc.second.Type; + } + return SvcList; + } + void MicroService::BusMessageReceived([[maybe_unused]] const std::string &Key, const std::string &Payload) { std::lock_guard G(InfraMutex_); @@ -55,13 +66,10 @@ namespace OpenWifi { Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) { auto PrivateEndPoint = Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString(); - if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE && - Services_.find(PrivateEndPoint) != Services_.end()) { - Services_[PrivateEndPoint].LastUpdate = Utils::Now(); - } else if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) { + if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) { Services_.erase(PrivateEndPoint); - poco_debug( - logger(), + poco_information( + Logger_, fmt::format( "Service {} ID={} leaving system.", Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) @@ -69,14 +77,7 @@ namespace OpenWifi { ID)); } else if (Event == KafkaTopics::ServiceEvents::EVENT_JOIN || Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE) { - poco_debug( - logger(), - fmt::format( - "Service {} ID={} joining system.", - Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) - .toString(), - ID)); - Services_[PrivateEndPoint] = Types::MicroServiceMeta{ + auto ServiceInfo = Types::MicroServiceMeta{ .Id = ID, .Type = Poco::toLower( Object->get(KafkaTopics::ServiceEvents::Fields::TYPE) @@ -94,20 +95,31 @@ namespace OpenWifi { .toString(), .LastUpdate = Utils::Now()}; - std::string SvcList; - for (const auto &Svc : Services_) { - if (SvcList.empty()) - SvcList = Svc.second.Type; - else - SvcList += ", " + Svc.second.Type; + auto s1 = MakeServiceListString(Services_); + Services_[PrivateEndPoint] = ServiceInfo; + if(Event == KafkaTopics::ServiceEvents::EVENT_JOIN) { + poco_information( + Logger_, + fmt::format( + "Service {} ID={} is joining the system. old={}", + Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) + .toString(), + ID, s1)); + std::string SvcList; + for (const auto &Svc : Services_) { + if (SvcList.empty()) + SvcList = Svc.second.Type; + else + SvcList += ", " + Svc.second.Type; + } + poco_information( + Logger_, + fmt::format("Current list of microservices: {}", SvcList)); } - poco_information( - logger(), - fmt::format("Current list of microservices: {}", SvcList)); } } else { - poco_error( - logger(), + poco_information( + Logger_, fmt::format("KAFKA-MSG: invalid event '{}', missing a field.", Event)); } @@ -118,33 +130,39 @@ namespace OpenWifi { Object->get(KafkaTopics::ServiceEvents::Fields::TOKEN).toString()); #endif } else { - poco_error( - logger(), + poco_information( + Logger_, fmt::format("KAFKA-MSG: invalid event '{}', missing token", Event)); } } else { - poco_error(logger(), + poco_information(Logger_, fmt::format("Unknown Event: {} Source: {}", Event, ID)); } } } else { - poco_error(logger(), "Bad bus message."); - std::ostringstream os; - Object->stringify(std::cout); + std::ostringstream os; + Object->stringify(std::cout); + poco_error(Logger_, fmt::format("Bad bus message: {}", os.str())); } - auto i = Services_.begin(); + auto ServiceHint = Services_.begin(); auto now = Utils::Now(); - for (; i != Services_.end();) { - if ((now - i->second.LastUpdate) > 120) { - poco_warning(logger(), fmt::format("ZombieService: Removing service {}, ", i->second.PublicEndPoint)); - i = Services_.erase(i); + auto si1 = Services_.size(); + auto ss1 = MakeServiceListString(Services_); + while(ServiceHint!=Services_.end()) { + if ((now - ServiceHint->second.LastUpdate) > 120) { + poco_information(Logger_, fmt::format("ZombieService: Removing service {}, ", ServiceHint->second.PublicEndPoint)); + ServiceHint = Services_.erase(ServiceHint); } else - ++i; + ++ServiceHint; } + if(Services_.size() != si1) { + auto ss2 = MakeServiceListString(Services_); + poco_information(Logger_, fmt::format("Current list of microservices: {} -> {}", ss1, ss2)); + } } catch (const Poco::Exception &E) { - logger().log(E); + Logger_.log(E); } } @@ -413,7 +431,7 @@ namespace OpenWifi { try { DataDir.createDirectory(); } catch (const Poco::Exception &E) { - logger().log(E); + Logger_.log(E); } } WWWAssetsDir_ = ConfigPath("openwifi.restapi.wwwassets", ""); @@ -698,7 +716,7 @@ namespace OpenWifi { auto APIKEY = Request.get("X-API-KEY"); return APIKEY == MyHash_; } catch (const Poco::Exception &E) { - logger().log(E); + Logger_.log(E); } return false; } From 13d2d39aede91c94e7619ec21e747d50ad7a68fd Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 3 Dec 2023 13:23:00 -0800 Subject: [PATCH 3/7] https://telecominfraproject.atlassian.net/browse/WIFI-13172 Signed-off-by: stephb9959 --- src/framework/MicroService.cpp | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/framework/MicroService.cpp b/src/framework/MicroService.cpp index 30460fe..6c1aed3 100644 --- a/src/framework/MicroService.cpp +++ b/src/framework/MicroService.cpp @@ -47,6 +47,9 @@ namespace OpenWifi { void MicroService::BusMessageReceived([[maybe_unused]] const std::string &Key, const std::string &Payload) { std::lock_guard G(InfraMutex_); + + Poco::Logger &BusLogger = Poco::Logger::create( + "BusMessageReceived", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel()); try { Poco::JSON::Parser P; auto Object = P.parse(Payload).extract(); @@ -69,7 +72,7 @@ namespace OpenWifi { if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) { Services_.erase(PrivateEndPoint); poco_information( - Logger_, + BusLogger, fmt::format( "Service {} ID={} leaving system.", Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) @@ -99,7 +102,7 @@ namespace OpenWifi { Services_[PrivateEndPoint] = ServiceInfo; if(Event == KafkaTopics::ServiceEvents::EVENT_JOIN) { poco_information( - Logger_, + BusLogger, fmt::format( "Service {} ID={} is joining the system. old={}", Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) @@ -113,13 +116,13 @@ namespace OpenWifi { SvcList += ", " + Svc.second.Type; } poco_information( - Logger_, + BusLogger, fmt::format("Current list of microservices: {}", SvcList)); } } } else { poco_information( - Logger_, + BusLogger, fmt::format("KAFKA-MSG: invalid event '{}', missing a field.", Event)); } @@ -131,18 +134,18 @@ namespace OpenWifi { #endif } else { poco_information( - Logger_, + BusLogger, fmt::format("KAFKA-MSG: invalid event '{}', missing token", Event)); } } else { - poco_information(Logger_, + poco_information(BusLogger, fmt::format("Unknown Event: {} Source: {}", Event, ID)); } } } else { std::ostringstream os; Object->stringify(std::cout); - poco_error(Logger_, fmt::format("Bad bus message: {}", os.str())); + poco_error(BusLogger, fmt::format("Bad bus message: {}", os.str())); } auto ServiceHint = Services_.begin(); @@ -151,18 +154,18 @@ namespace OpenWifi { auto ss1 = MakeServiceListString(Services_); while(ServiceHint!=Services_.end()) { if ((now - ServiceHint->second.LastUpdate) > 120) { - poco_information(Logger_, fmt::format("ZombieService: Removing service {}, ", ServiceHint->second.PublicEndPoint)); + poco_information(BusLogger, fmt::format("ZombieService: Removing service {}, ", ServiceHint->second.PublicEndPoint)); ServiceHint = Services_.erase(ServiceHint); } else ++ServiceHint; } if(Services_.size() != si1) { auto ss2 = MakeServiceListString(Services_); - poco_information(Logger_, fmt::format("Current list of microservices: {} -> {}", ss1, ss2)); + poco_information(BusLogger, fmt::format("Current list of microservices: {} -> {}", ss1, ss2)); } } catch (const Poco::Exception &E) { - Logger_.log(E); + BusLogger.log(E); } } From a283f31d7fa2b0224980e0fdf040faf25a2baebf Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Mon, 4 Dec 2023 07:41:06 -0800 Subject: [PATCH 4/7] https://telecominfraproject.atlassian.net/browse/WIFI-13172 Signed-off-by: stephb9959 --- build | 2 +- src/framework/EventBusManager.cpp | 2 -- src/framework/EventBusManager.h | 12 ++++++++++++ src/framework/MicroService.cpp | 10 ++++------ src/framework/MicroService.h | 1 - 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/build b/build index bf0d87a..7813681 100644 --- a/build +++ b/build @@ -1 +1 @@ -4 \ No newline at end of file +5 \ No newline at end of file diff --git a/src/framework/EventBusManager.cpp b/src/framework/EventBusManager.cpp index ca28ad9..66774f1 100644 --- a/src/framework/EventBusManager.cpp +++ b/src/framework/EventBusManager.cpp @@ -9,8 +9,6 @@ namespace OpenWifi { - EventBusManager::EventBusManager(Poco::Logger &L) : Logger_(L) {} - void EventBusManager::run() { Running_ = true; Utils::SetThreadName("fmwk:EventMgr"); diff --git a/src/framework/EventBusManager.h b/src/framework/EventBusManager.h index 9a7316e..fe8a82c 100644 --- a/src/framework/EventBusManager.h +++ b/src/framework/EventBusManager.h @@ -12,6 +12,16 @@ namespace OpenWifi { class EventBusManager : public Poco::Runnable { public: + EventBusManager() : + Logger_(Poco::Logger::create( + "EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel())) { + } + + static auto instance() { + static auto instance_ = new EventBusManager; + return instance_; + } + explicit EventBusManager(Poco::Logger &L); void run() final; void Start(); @@ -24,4 +34,6 @@ namespace OpenWifi { Poco::Logger &Logger_; }; + inline auto EventBusManager() { return EventBusManager::instance(); } + } // namespace OpenWifi diff --git a/src/framework/MicroService.cpp b/src/framework/MicroService.cpp index 6c1aed3..fabc65b 100644 --- a/src/framework/MicroService.cpp +++ b/src/framework/MicroService.cpp @@ -48,8 +48,8 @@ namespace OpenWifi { const std::string &Payload) { std::lock_guard G(InfraMutex_); - Poco::Logger &BusLogger = Poco::Logger::create( - "BusMessageReceived", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel()); + Poco::Logger &BusLogger = EventBusManager()->Logger(); + try { Poco::JSON::Parser P; auto Object = P.parse(Payload).extract(); @@ -552,14 +552,12 @@ namespace OpenWifi { for (auto i : SubSystems_) { i->Start(); } - EventBusManager_ = std::make_unique(Poco::Logger::create( - "EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel())); - EventBusManager_->Start(); + EventBusManager()->Start(); } void MicroService::StopSubSystemServers() { AddActivity("Stopping"); - EventBusManager_->Stop(); + EventBusManager()->Stop(); for (auto i = SubSystems_.rbegin(); i != SubSystems_.rend(); ++i) { (*i)->Stop(); } diff --git a/src/framework/MicroService.h b/src/framework/MicroService.h index 00532b4..7290bbb 100644 --- a/src/framework/MicroService.h +++ b/src/framework/MicroService.h @@ -201,7 +201,6 @@ namespace OpenWifi { Poco::JWT::Signer Signer_; Poco::Logger &Logger_; Poco::ThreadPool TimerPool_{"timer:pool", 2, 32}; - std::unique_ptr EventBusManager_; }; inline MicroService *MicroService::instance_ = nullptr; From 7e72cc7ac7e3df96e610e26faf7872974046b46c Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Mon, 4 Dec 2023 08:31:31 -0800 Subject: [PATCH 5/7] https://telecominfraproject.atlassian.net/browse/WIFI-13172 Signed-off-by: stephb9959 --- src/framework/MicroService.cpp | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/framework/MicroService.cpp b/src/framework/MicroService.cpp index fabc65b..13f89fb 100644 --- a/src/framework/MicroService.cpp +++ b/src/framework/MicroService.cpp @@ -99,15 +99,21 @@ namespace OpenWifi { .LastUpdate = Utils::Now()}; auto s1 = MakeServiceListString(Services_); + auto PreviousSize = Services_.size(); Services_[PrivateEndPoint] = ServiceInfo; + auto CurrentSize = Services_.size(); if(Event == KafkaTopics::ServiceEvents::EVENT_JOIN) { - poco_information( - BusLogger, - fmt::format( - "Service {} ID={} is joining the system. old={}", - Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) - .toString(), - ID, s1)); + if(!s1.empty()) { + poco_information( + BusLogger, + fmt::format( + "Service {} ID={} is joining the system.", + Object + ->get( + KafkaTopics::ServiceEvents::Fields::PRIVATE) + .toString(), + ID)); + } std::string SvcList; for (const auto &Svc : Services_) { if (SvcList.empty()) @@ -118,6 +124,15 @@ namespace OpenWifi { poco_information( BusLogger, fmt::format("Current list of microservices: {}", SvcList)); + } else if(CurrentSize!=PreviousSize) { + poco_information( + BusLogger, + fmt::format( + "Service {} ID={} is being added back in.", + Object + ->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) + .toString(), + ID)); } } } else { From c020e702dfe4a27c18eff1abc79108fb46aa829b Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Mon, 11 Dec 2023 09:48:04 -0800 Subject: [PATCH 6/7] https://telecominfraproject.atlassian.net/browse/WIFI-13172 Signed-off-by: stephb9959 --- src/framework/orm.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/framework/orm.h b/src/framework/orm.h index 9e83540..c0be137 100644 --- a/src/framework/orm.h +++ b/src/framework/orm.h @@ -576,8 +576,8 @@ namespace ORM { bool UpdateRecord(field_name_t FieldName, const T &Value, const RecordType &R) { try { assert(ValidFieldName(FieldName)); - Poco::Data::Session Session = Pool_.get(); + Session.begin(); Poco::Data::Statement Update(Session); RecordTuple RT; @@ -593,6 +593,7 @@ namespace ORM { Update.execute(); if (Cache_) Cache_->UpdateCache(R); + Session.commit(); return true; } catch (const Poco::Exception &E) { Logger_.log(E); @@ -662,6 +663,7 @@ namespace ORM { assert(ValidFieldName(FieldName)); Poco::Data::Session Session = Pool_.get(); + Session.begin(); Poco::Data::Statement Delete(Session); std::string St = "delete from " + TableName_ + " where " + FieldName + "=?"; @@ -671,6 +673,7 @@ namespace ORM { Delete.execute(); if (Cache_) Cache_->Delete(FieldName, Value); + Session.commit(); return true; } catch (const Poco::Exception &E) { Logger_.log(E); @@ -682,11 +685,13 @@ namespace ORM { try { assert(!WhereClause.empty()); Poco::Data::Session Session = Pool_.get(); + Session.begin(); Poco::Data::Statement Delete(Session); std::string St = "delete from " + TableName_ + " where " + WhereClause; Delete << St; Delete.execute(); + Session.commit(); return true; } catch (const Poco::Exception &E) { Logger_.log(E); From 0178b5e5d0d7391f6a60d9cedd3298ddad8990af Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Fri, 15 Dec 2023 07:26:30 -0800 Subject: [PATCH 7/7] https://telecominfraproject.atlassian.net/browse/WIFI-13172 Signed-off-by: stephb9959 --- build | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build b/build index 7813681..62f9457 100644 --- a/build +++ b/build @@ -1 +1 @@ -5 \ No newline at end of file +6 \ No newline at end of file