diff --git a/build b/build index d8263ee..62f9457 100644 --- a/build +++ b/build @@ -1 +1 @@ -2 \ No newline at end of file +6 \ No newline at end of file diff --git a/src/framework/EventBusManager.cpp b/src/framework/EventBusManager.cpp index ca28ad9..66774f1 100644 --- a/src/framework/EventBusManager.cpp +++ b/src/framework/EventBusManager.cpp @@ -9,8 +9,6 @@ namespace OpenWifi { - EventBusManager::EventBusManager(Poco::Logger &L) : Logger_(L) {} - void EventBusManager::run() { Running_ = true; Utils::SetThreadName("fmwk:EventMgr"); diff --git a/src/framework/EventBusManager.h b/src/framework/EventBusManager.h index 9a7316e..fe8a82c 100644 --- a/src/framework/EventBusManager.h +++ b/src/framework/EventBusManager.h @@ -12,6 +12,16 @@ namespace OpenWifi { class EventBusManager : public Poco::Runnable { public: + EventBusManager() : + Logger_(Poco::Logger::create( + "EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel())) { + } + + static auto instance() { + static auto instance_ = new EventBusManager; + return instance_; + } + explicit EventBusManager(Poco::Logger &L); void run() final; void Start(); @@ -24,4 +34,6 @@ namespace OpenWifi { Poco::Logger &Logger_; }; + inline auto EventBusManager() { return EventBusManager::instance(); } + } // namespace OpenWifi diff --git a/src/framework/MicroService.cpp b/src/framework/MicroService.cpp index 5d517d9..13f89fb 100644 --- a/src/framework/MicroService.cpp +++ b/src/framework/MicroService.cpp @@ -33,9 +33,23 @@ namespace OpenWifi { void MicroService::Exit(int Reason) { std::exit(Reason); } + static std::string MakeServiceListString(const Types::MicroServiceMetaMap &Services) { + std::string SvcList; + for (const auto &Svc : Services) { + if (SvcList.empty()) + SvcList = Svc.second.Type; + else + SvcList += ", " + Svc.second.Type; + } + return SvcList; + } + void MicroService::BusMessageReceived([[maybe_unused]] const std::string &Key, const std::string &Payload) { std::lock_guard G(InfraMutex_); + + Poco::Logger &BusLogger = EventBusManager()->Logger(); + try { Poco::JSON::Parser P; auto Object = P.parse(Payload).extract(); @@ -55,13 +69,10 @@ namespace OpenWifi { Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) { auto PrivateEndPoint = Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString(); - if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE && - Services_.find(PrivateEndPoint) != Services_.end()) { - Services_[PrivateEndPoint].LastUpdate = Utils::Now(); - } else if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) { + if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) { Services_.erase(PrivateEndPoint); - poco_debug( - logger(), + poco_information( + BusLogger, fmt::format( "Service {} ID={} leaving system.", Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) @@ -69,14 +80,7 @@ namespace OpenWifi { ID)); } else if (Event == KafkaTopics::ServiceEvents::EVENT_JOIN || Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE) { - poco_debug( - logger(), - fmt::format( - "Service {} ID={} joining system.", - Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) - .toString(), - ID)); - Services_[PrivateEndPoint] = Types::MicroServiceMeta{ + auto ServiceInfo = Types::MicroServiceMeta{ .Id = ID, .Type = Poco::toLower( Object->get(KafkaTopics::ServiceEvents::Fields::TYPE) @@ -94,20 +98,46 @@ namespace OpenWifi { .toString(), .LastUpdate = Utils::Now()}; - std::string SvcList; - for (const auto &Svc : Services_) { - if (SvcList.empty()) - SvcList = Svc.second.Type; - else - SvcList += ", " + Svc.second.Type; + auto s1 = MakeServiceListString(Services_); + auto PreviousSize = Services_.size(); + Services_[PrivateEndPoint] = ServiceInfo; + auto CurrentSize = Services_.size(); + if(Event == KafkaTopics::ServiceEvents::EVENT_JOIN) { + if(!s1.empty()) { + poco_information( + BusLogger, + fmt::format( + "Service {} ID={} is joining the system.", + Object + ->get( + KafkaTopics::ServiceEvents::Fields::PRIVATE) + .toString(), + ID)); + } + std::string SvcList; + for (const auto &Svc : Services_) { + if (SvcList.empty()) + SvcList = Svc.second.Type; + else + SvcList += ", " + Svc.second.Type; + } + poco_information( + BusLogger, + fmt::format("Current list of microservices: {}", SvcList)); + } else if(CurrentSize!=PreviousSize) { + poco_information( + BusLogger, + fmt::format( + "Service {} ID={} is being added back in.", + Object + ->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) + .toString(), + ID)); } - poco_information( - logger(), - fmt::format("Current list of microservices: {}", SvcList)); } } else { - poco_error( - logger(), + poco_information( + BusLogger, fmt::format("KAFKA-MSG: invalid event '{}', missing a field.", Event)); } @@ -118,32 +148,39 @@ namespace OpenWifi { Object->get(KafkaTopics::ServiceEvents::Fields::TOKEN).toString()); #endif } else { - poco_error( - logger(), + poco_information( + BusLogger, fmt::format("KAFKA-MSG: invalid event '{}', missing token", Event)); } } else { - poco_error(logger(), + poco_information(BusLogger, fmt::format("Unknown Event: {} Source: {}", Event, ID)); } } } else { - poco_error(logger(), "Bad bus message."); - std::ostringstream os; - Object->stringify(std::cout); + std::ostringstream os; + Object->stringify(std::cout); + poco_error(BusLogger, fmt::format("Bad bus message: {}", os.str())); } - auto i = Services_.begin(); + auto ServiceHint = Services_.begin(); auto now = Utils::Now(); - for (; i != Services_.end();) { - if ((now - i->second.LastUpdate) > 60) { - i = Services_.erase(i); + auto si1 = Services_.size(); + auto ss1 = MakeServiceListString(Services_); + while(ServiceHint!=Services_.end()) { + if ((now - ServiceHint->second.LastUpdate) > 120) { + poco_information(BusLogger, fmt::format("ZombieService: Removing service {}, ", ServiceHint->second.PublicEndPoint)); + ServiceHint = Services_.erase(ServiceHint); } else - ++i; + ++ServiceHint; } + if(Services_.size() != si1) { + auto ss2 = MakeServiceListString(Services_); + poco_information(BusLogger, fmt::format("Current list of microservices: {} -> {}", ss1, ss2)); + } } catch (const Poco::Exception &E) { - logger().log(E); + BusLogger.log(E); } } @@ -412,7 +449,7 @@ namespace OpenWifi { try { DataDir.createDirectory(); } catch (const Poco::Exception &E) { - logger().log(E); + Logger_.log(E); } } WWWAssetsDir_ = ConfigPath("openwifi.restapi.wwwassets", ""); @@ -530,14 +567,12 @@ namespace OpenWifi { for (auto i : SubSystems_) { i->Start(); } - EventBusManager_ = std::make_unique(Poco::Logger::create( - "EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel())); - EventBusManager_->Start(); + EventBusManager()->Start(); } void MicroService::StopSubSystemServers() { AddActivity("Stopping"); - EventBusManager_->Stop(); + EventBusManager()->Stop(); for (auto i = SubSystems_.rbegin(); i != SubSystems_.rend(); ++i) { (*i)->Stop(); } @@ -697,7 +732,7 @@ namespace OpenWifi { auto APIKEY = Request.get("X-API-KEY"); return APIKEY == MyHash_; } catch (const Poco::Exception &E) { - logger().log(E); + Logger_.log(E); } return false; } diff --git a/src/framework/MicroService.h b/src/framework/MicroService.h index 00532b4..7290bbb 100644 --- a/src/framework/MicroService.h +++ b/src/framework/MicroService.h @@ -201,7 +201,6 @@ namespace OpenWifi { Poco::JWT::Signer Signer_; Poco::Logger &Logger_; Poco::ThreadPool TimerPool_{"timer:pool", 2, 32}; - std::unique_ptr EventBusManager_; }; inline MicroService *MicroService::instance_ = nullptr; diff --git a/src/framework/StorageClass.h b/src/framework/StorageClass.h index cfb5530..36751bc 100644 --- a/src/framework/StorageClass.h +++ b/src/framework/StorageClass.h @@ -47,6 +47,8 @@ namespace OpenWifi { } + Poco::Data::SessionPool &Pool() { return *Pool_; } + private: inline int Setup_SQLite(); inline int Setup_MySQL(); diff --git a/src/framework/orm.h b/src/framework/orm.h index 9e83540..c0be137 100644 --- a/src/framework/orm.h +++ b/src/framework/orm.h @@ -576,8 +576,8 @@ namespace ORM { bool UpdateRecord(field_name_t FieldName, const T &Value, const RecordType &R) { try { assert(ValidFieldName(FieldName)); - Poco::Data::Session Session = Pool_.get(); + Session.begin(); Poco::Data::Statement Update(Session); RecordTuple RT; @@ -593,6 +593,7 @@ namespace ORM { Update.execute(); if (Cache_) Cache_->UpdateCache(R); + Session.commit(); return true; } catch (const Poco::Exception &E) { Logger_.log(E); @@ -662,6 +663,7 @@ namespace ORM { assert(ValidFieldName(FieldName)); Poco::Data::Session Session = Pool_.get(); + Session.begin(); Poco::Data::Statement Delete(Session); std::string St = "delete from " + TableName_ + " where " + FieldName + "=?"; @@ -671,6 +673,7 @@ namespace ORM { Delete.execute(); if (Cache_) Cache_->Delete(FieldName, Value); + Session.commit(); return true; } catch (const Poco::Exception &E) { Logger_.log(E); @@ -682,11 +685,13 @@ namespace ORM { try { assert(!WhereClause.empty()); Poco::Data::Session Session = Pool_.get(); + Session.begin(); Poco::Data::Statement Delete(Session); std::string St = "delete from " + TableName_ + " where " + WhereClause; Delete << St; Delete.execute(); + Session.commit(); return true; } catch (const Poco::Exception &E) { Logger_.log(E); diff --git a/src/framework/utils.h b/src/framework/utils.h index fa0d58d..b20d32a 100644 --- a/src/framework/utils.h +++ b/src/framework/utils.h @@ -316,5 +316,90 @@ namespace OpenWifi::Utils { uint32_t Port; }; + class CompressedString { + public: + CompressedString() { + DecompressedSize_ = 0; + }; + + explicit CompressedString(const std::string &Data) : DecompressedSize_(Data.size()) { + CompressIt(Data); + } + + CompressedString(const CompressedString &Data) { + this->DecompressedSize_ = Data.DecompressedSize_; + this->CompressedData_ = Data.CompressedData_; + } + + CompressedString& operator=(const CompressedString& rhs) { + if (this != &rhs) { + this->DecompressedSize_ = rhs.DecompressedSize_; + this->CompressedData_ = rhs.CompressedData_; + } + return *this; + } + + CompressedString& operator=(CompressedString&& rhs) { + if (this != &rhs) { + this->DecompressedSize_ = rhs.DecompressedSize_; + this->CompressedData_ = rhs.CompressedData_; + } + return *this; + } + + ~CompressedString() = default; + + operator std::string() const { + return DecompressIt(); + } + + CompressedString &operator=(const std::string &Data) { + DecompressedSize_ = Data.size(); + CompressIt(Data); + return *this; + } + + auto CompressedSize() const { return CompressedData_.size(); } + auto DecompressedSize() const { return DecompressedSize_; } + + private: + std::string CompressedData_; + std::size_t DecompressedSize_; + + inline void CompressIt(const std::string &Data) { + z_stream strm; // = {0}; + CompressedData_.resize(Data.size()); + strm.next_in = (Bytef *)Data.data(); + strm.avail_in = Data.size(); + strm.next_out = (Bytef *)CompressedData_.data(); + strm.avail_out = Data.size(); + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY); + deflate(&strm, Z_FINISH); + deflateEnd(&strm); + CompressedData_.resize(strm.total_out); + } + + [[nodiscard]] std::string DecompressIt() const { + std::string Result; + if(DecompressedSize_!=0) { + Result.resize(DecompressedSize_); + z_stream strm ; //= {0}; + strm.next_in = (Bytef *)CompressedData_.data(); + strm.avail_in = CompressedData_.size(); + strm.next_out = (Bytef *)Result.data(); + strm.avail_out = Result.size(); + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + inflateInit2(&strm, 15 + 32); + inflate(&strm, Z_FINISH); + inflateEnd(&strm); + } + return Result; + } + }; } // namespace OpenWifi::Utils