Compare commits

..

10 Commits

Author SHA1 Message Date
TIP Automation User
4031e50aed Chg: update image tag in helm values to v3.0.0 2023-12-29 15:19:29 +00:00
TIP Automation User
d4f4f45ec1 Chg: update image tag in helm values to v3.0.0-RC2 2023-12-15 23:03:24 +00:00
Stephane Bourque
183914dae7 Merge pull request #87 from Telecominfraproject/main
https://telecominfraproject.atlassian.net/browse/WIFI-13147
2023-12-15 09:30:28 -08:00
stephb9959
0178b5e5d0 https://telecominfraproject.atlassian.net/browse/WIFI-13172
Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
2023-12-15 07:26:30 -08:00
stephb9959
c020e702df https://telecominfraproject.atlassian.net/browse/WIFI-13172
Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
2023-12-11 09:48:04 -08:00
stephb9959
7e72cc7ac7 https://telecominfraproject.atlassian.net/browse/WIFI-13172
Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
2023-12-04 08:31:31 -08:00
stephb9959
a283f31d7f https://telecominfraproject.atlassian.net/browse/WIFI-13172
Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
2023-12-04 07:41:06 -08:00
stephb9959
13d2d39aed https://telecominfraproject.atlassian.net/browse/WIFI-13172
Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
2023-12-03 13:23:00 -08:00
stephb9959
7d5c130d5c https://telecominfraproject.atlassian.net/browse/WIFI-13172
Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
2023-12-03 12:05:20 -08:00
stephb9959
bc4da0aaeb https://telecominfraproject.atlassian.net/browse/WIFI-13172
Signed-off-by: stephb9959 <stephane.bourque@gmail.com>
2023-12-02 13:29:23 -08:00
9 changed files with 185 additions and 49 deletions

2
build
View File

@@ -1 +1 @@
2 6

View File

@@ -9,7 +9,7 @@ fullnameOverride: ""
images: images:
owfms: owfms:
repository: tip-tip-wlan-cloud-ucentral.jfrog.io/owfms repository: tip-tip-wlan-cloud-ucentral.jfrog.io/owfms
tag: v3.0.0-RC1 tag: v3.0.0
pullPolicy: Always pullPolicy: Always
# regcred: # regcred:
# registry: tip-tip-wlan-cloud-ucentral.jfrog.io # registry: tip-tip-wlan-cloud-ucentral.jfrog.io

View File

@@ -9,8 +9,6 @@
namespace OpenWifi { namespace OpenWifi {
EventBusManager::EventBusManager(Poco::Logger &L) : Logger_(L) {}
void EventBusManager::run() { void EventBusManager::run() {
Running_ = true; Running_ = true;
Utils::SetThreadName("fmwk:EventMgr"); Utils::SetThreadName("fmwk:EventMgr");

View File

@@ -12,6 +12,16 @@ namespace OpenWifi {
class EventBusManager : public Poco::Runnable { class EventBusManager : public Poco::Runnable {
public: public:
EventBusManager() :
Logger_(Poco::Logger::create(
"EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel())) {
}
static auto instance() {
static auto instance_ = new EventBusManager;
return instance_;
}
explicit EventBusManager(Poco::Logger &L); explicit EventBusManager(Poco::Logger &L);
void run() final; void run() final;
void Start(); void Start();
@@ -24,4 +34,6 @@ namespace OpenWifi {
Poco::Logger &Logger_; Poco::Logger &Logger_;
}; };
inline auto EventBusManager() { return EventBusManager::instance(); }
} // namespace OpenWifi } // namespace OpenWifi

View File

@@ -33,9 +33,23 @@ namespace OpenWifi {
void MicroService::Exit(int Reason) { std::exit(Reason); } void MicroService::Exit(int Reason) { std::exit(Reason); }
static std::string MakeServiceListString(const Types::MicroServiceMetaMap &Services) {
std::string SvcList;
for (const auto &Svc : Services) {
if (SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
}
return SvcList;
}
void MicroService::BusMessageReceived([[maybe_unused]] const std::string &Key, void MicroService::BusMessageReceived([[maybe_unused]] const std::string &Key,
const std::string &Payload) { const std::string &Payload) {
std::lock_guard G(InfraMutex_); std::lock_guard G(InfraMutex_);
Poco::Logger &BusLogger = EventBusManager()->Logger();
try { try {
Poco::JSON::Parser P; Poco::JSON::Parser P;
auto Object = P.parse(Payload).extract<Poco::JSON::Object::Ptr>(); auto Object = P.parse(Payload).extract<Poco::JSON::Object::Ptr>();
@@ -55,13 +69,10 @@ namespace OpenWifi {
Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) { Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) {
auto PrivateEndPoint = auto PrivateEndPoint =
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString(); Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString();
if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE && if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
Services_.find(PrivateEndPoint) != Services_.end()) {
Services_[PrivateEndPoint].LastUpdate = Utils::Now();
} else if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
Services_.erase(PrivateEndPoint); Services_.erase(PrivateEndPoint);
poco_debug( poco_information(
logger(), BusLogger,
fmt::format( fmt::format(
"Service {} ID={} leaving system.", "Service {} ID={} leaving system.",
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE) Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
@@ -69,14 +80,7 @@ namespace OpenWifi {
ID)); ID));
} else if (Event == KafkaTopics::ServiceEvents::EVENT_JOIN || } else if (Event == KafkaTopics::ServiceEvents::EVENT_JOIN ||
Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE) { Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE) {
poco_debug( auto ServiceInfo = Types::MicroServiceMeta{
logger(),
fmt::format(
"Service {} ID={} joining system.",
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
Services_[PrivateEndPoint] = Types::MicroServiceMeta{
.Id = ID, .Id = ID,
.Type = Poco::toLower( .Type = Poco::toLower(
Object->get(KafkaTopics::ServiceEvents::Fields::TYPE) Object->get(KafkaTopics::ServiceEvents::Fields::TYPE)
@@ -94,20 +98,46 @@ namespace OpenWifi {
.toString(), .toString(),
.LastUpdate = Utils::Now()}; .LastUpdate = Utils::Now()};
std::string SvcList; auto s1 = MakeServiceListString(Services_);
for (const auto &Svc : Services_) { auto PreviousSize = Services_.size();
if (SvcList.empty()) Services_[PrivateEndPoint] = ServiceInfo;
SvcList = Svc.second.Type; auto CurrentSize = Services_.size();
else if(Event == KafkaTopics::ServiceEvents::EVENT_JOIN) {
SvcList += ", " + Svc.second.Type; if(!s1.empty()) {
poco_information(
BusLogger,
fmt::format(
"Service {} ID={} is joining the system.",
Object
->get(
KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
}
std::string SvcList;
for (const auto &Svc : Services_) {
if (SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
}
poco_information(
BusLogger,
fmt::format("Current list of microservices: {}", SvcList));
} else if(CurrentSize!=PreviousSize) {
poco_information(
BusLogger,
fmt::format(
"Service {} ID={} is being added back in.",
Object
->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
} }
poco_information(
logger(),
fmt::format("Current list of microservices: {}", SvcList));
} }
} else { } else {
poco_error( poco_information(
logger(), BusLogger,
fmt::format("KAFKA-MSG: invalid event '{}', missing a field.", fmt::format("KAFKA-MSG: invalid event '{}', missing a field.",
Event)); Event));
} }
@@ -118,32 +148,39 @@ namespace OpenWifi {
Object->get(KafkaTopics::ServiceEvents::Fields::TOKEN).toString()); Object->get(KafkaTopics::ServiceEvents::Fields::TOKEN).toString());
#endif #endif
} else { } else {
poco_error( poco_information(
logger(), BusLogger,
fmt::format("KAFKA-MSG: invalid event '{}', missing token", Event)); fmt::format("KAFKA-MSG: invalid event '{}', missing token", Event));
} }
} else { } else {
poco_error(logger(), poco_information(BusLogger,
fmt::format("Unknown Event: {} Source: {}", Event, ID)); fmt::format("Unknown Event: {} Source: {}", Event, ID));
} }
} }
} else { } else {
poco_error(logger(), "Bad bus message."); std::ostringstream os;
std::ostringstream os; Object->stringify(std::cout);
Object->stringify(std::cout); poco_error(BusLogger, fmt::format("Bad bus message: {}", os.str()));
} }
auto i = Services_.begin(); auto ServiceHint = Services_.begin();
auto now = Utils::Now(); auto now = Utils::Now();
for (; i != Services_.end();) { auto si1 = Services_.size();
if ((now - i->second.LastUpdate) > 60) { auto ss1 = MakeServiceListString(Services_);
i = Services_.erase(i); while(ServiceHint!=Services_.end()) {
if ((now - ServiceHint->second.LastUpdate) > 120) {
poco_information(BusLogger, fmt::format("ZombieService: Removing service {}, ", ServiceHint->second.PublicEndPoint));
ServiceHint = Services_.erase(ServiceHint);
} else } else
++i; ++ServiceHint;
} }
if(Services_.size() != si1) {
auto ss2 = MakeServiceListString(Services_);
poco_information(BusLogger, fmt::format("Current list of microservices: {} -> {}", ss1, ss2));
}
} catch (const Poco::Exception &E) { } catch (const Poco::Exception &E) {
logger().log(E); BusLogger.log(E);
} }
} }
@@ -412,7 +449,7 @@ namespace OpenWifi {
try { try {
DataDir.createDirectory(); DataDir.createDirectory();
} catch (const Poco::Exception &E) { } catch (const Poco::Exception &E) {
logger().log(E); Logger_.log(E);
} }
} }
WWWAssetsDir_ = ConfigPath("openwifi.restapi.wwwassets", ""); WWWAssetsDir_ = ConfigPath("openwifi.restapi.wwwassets", "");
@@ -530,14 +567,12 @@ namespace OpenWifi {
for (auto i : SubSystems_) { for (auto i : SubSystems_) {
i->Start(); i->Start();
} }
EventBusManager_ = std::make_unique<EventBusManager>(Poco::Logger::create( EventBusManager()->Start();
"EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel()));
EventBusManager_->Start();
} }
void MicroService::StopSubSystemServers() { void MicroService::StopSubSystemServers() {
AddActivity("Stopping"); AddActivity("Stopping");
EventBusManager_->Stop(); EventBusManager()->Stop();
for (auto i = SubSystems_.rbegin(); i != SubSystems_.rend(); ++i) { for (auto i = SubSystems_.rbegin(); i != SubSystems_.rend(); ++i) {
(*i)->Stop(); (*i)->Stop();
} }
@@ -697,7 +732,7 @@ namespace OpenWifi {
auto APIKEY = Request.get("X-API-KEY"); auto APIKEY = Request.get("X-API-KEY");
return APIKEY == MyHash_; return APIKEY == MyHash_;
} catch (const Poco::Exception &E) { } catch (const Poco::Exception &E) {
logger().log(E); Logger_.log(E);
} }
return false; return false;
} }

View File

@@ -201,7 +201,6 @@ namespace OpenWifi {
Poco::JWT::Signer Signer_; Poco::JWT::Signer Signer_;
Poco::Logger &Logger_; Poco::Logger &Logger_;
Poco::ThreadPool TimerPool_{"timer:pool", 2, 32}; Poco::ThreadPool TimerPool_{"timer:pool", 2, 32};
std::unique_ptr<EventBusManager> EventBusManager_;
}; };
inline MicroService *MicroService::instance_ = nullptr; inline MicroService *MicroService::instance_ = nullptr;

View File

@@ -47,6 +47,8 @@ namespace OpenWifi {
} }
Poco::Data::SessionPool &Pool() { return *Pool_; }
private: private:
inline int Setup_SQLite(); inline int Setup_SQLite();
inline int Setup_MySQL(); inline int Setup_MySQL();

View File

@@ -576,8 +576,8 @@ namespace ORM {
bool UpdateRecord(field_name_t FieldName, const T &Value, const RecordType &R) { bool UpdateRecord(field_name_t FieldName, const T &Value, const RecordType &R) {
try { try {
assert(ValidFieldName(FieldName)); assert(ValidFieldName(FieldName));
Poco::Data::Session Session = Pool_.get(); Poco::Data::Session Session = Pool_.get();
Session.begin();
Poco::Data::Statement Update(Session); Poco::Data::Statement Update(Session);
RecordTuple RT; RecordTuple RT;
@@ -593,6 +593,7 @@ namespace ORM {
Update.execute(); Update.execute();
if (Cache_) if (Cache_)
Cache_->UpdateCache(R); Cache_->UpdateCache(R);
Session.commit();
return true; return true;
} catch (const Poco::Exception &E) { } catch (const Poco::Exception &E) {
Logger_.log(E); Logger_.log(E);
@@ -662,6 +663,7 @@ namespace ORM {
assert(ValidFieldName(FieldName)); assert(ValidFieldName(FieldName));
Poco::Data::Session Session = Pool_.get(); Poco::Data::Session Session = Pool_.get();
Session.begin();
Poco::Data::Statement Delete(Session); Poco::Data::Statement Delete(Session);
std::string St = "delete from " + TableName_ + " where " + FieldName + "=?"; std::string St = "delete from " + TableName_ + " where " + FieldName + "=?";
@@ -671,6 +673,7 @@ namespace ORM {
Delete.execute(); Delete.execute();
if (Cache_) if (Cache_)
Cache_->Delete(FieldName, Value); Cache_->Delete(FieldName, Value);
Session.commit();
return true; return true;
} catch (const Poco::Exception &E) { } catch (const Poco::Exception &E) {
Logger_.log(E); Logger_.log(E);
@@ -682,11 +685,13 @@ namespace ORM {
try { try {
assert(!WhereClause.empty()); assert(!WhereClause.empty());
Poco::Data::Session Session = Pool_.get(); Poco::Data::Session Session = Pool_.get();
Session.begin();
Poco::Data::Statement Delete(Session); Poco::Data::Statement Delete(Session);
std::string St = "delete from " + TableName_ + " where " + WhereClause; std::string St = "delete from " + TableName_ + " where " + WhereClause;
Delete << St; Delete << St;
Delete.execute(); Delete.execute();
Session.commit();
return true; return true;
} catch (const Poco::Exception &E) { } catch (const Poco::Exception &E) {
Logger_.log(E); Logger_.log(E);

View File

@@ -316,5 +316,90 @@ namespace OpenWifi::Utils {
uint32_t Port; uint32_t Port;
}; };
class CompressedString {
public:
CompressedString() {
DecompressedSize_ = 0;
};
explicit CompressedString(const std::string &Data) : DecompressedSize_(Data.size()) {
CompressIt(Data);
}
CompressedString(const CompressedString &Data) {
this->DecompressedSize_ = Data.DecompressedSize_;
this->CompressedData_ = Data.CompressedData_;
}
CompressedString& operator=(const CompressedString& rhs) {
if (this != &rhs) {
this->DecompressedSize_ = rhs.DecompressedSize_;
this->CompressedData_ = rhs.CompressedData_;
}
return *this;
}
CompressedString& operator=(CompressedString&& rhs) {
if (this != &rhs) {
this->DecompressedSize_ = rhs.DecompressedSize_;
this->CompressedData_ = rhs.CompressedData_;
}
return *this;
}
~CompressedString() = default;
operator std::string() const {
return DecompressIt();
}
CompressedString &operator=(const std::string &Data) {
DecompressedSize_ = Data.size();
CompressIt(Data);
return *this;
}
auto CompressedSize() const { return CompressedData_.size(); }
auto DecompressedSize() const { return DecompressedSize_; }
private:
std::string CompressedData_;
std::size_t DecompressedSize_;
inline void CompressIt(const std::string &Data) {
z_stream strm; // = {0};
CompressedData_.resize(Data.size());
strm.next_in = (Bytef *)Data.data();
strm.avail_in = Data.size();
strm.next_out = (Bytef *)CompressedData_.data();
strm.avail_out = Data.size();
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
deflate(&strm, Z_FINISH);
deflateEnd(&strm);
CompressedData_.resize(strm.total_out);
}
[[nodiscard]] std::string DecompressIt() const {
std::string Result;
if(DecompressedSize_!=0) {
Result.resize(DecompressedSize_);
z_stream strm ; //= {0};
strm.next_in = (Bytef *)CompressedData_.data();
strm.avail_in = CompressedData_.size();
strm.next_out = (Bytef *)Result.data();
strm.avail_out = Result.size();
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
inflateInit2(&strm, 15 + 32);
inflate(&strm, Z_FINISH);
inflateEnd(&strm);
}
return Result;
}
};
} // namespace OpenWifi::Utils } // namespace OpenWifi::Utils