From b3bca3003a8f763f39ff7ffd6e29559b336f6c40 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Fri, 1 Sep 2023 09:17:44 -0700 Subject: [PATCH 01/11] https://telecominfraproject.atlassian.net/browse/WIFI-12868 Signed-off-by: stephb9959 --- src/framework/ALBserver.cpp | 2 +- src/framework/ALBserver.h | 14 +++++++++++ src/framework/KafkaManager.cpp | 2 +- src/framework/KafkaManager.h | 12 +++++----- src/framework/MicroService.cpp | 2 ++ src/framework/MicroServiceFuncs.cpp | 6 +++++ src/framework/MicroServiceFuncs.h | 1 + src/framework/OpenWifiTypes.h | 3 +++ src/framework/RESTAPI_utils.h | 36 +++++++++++++++++++++++++++++ src/framework/StorageClass.h | 20 +++++++++------- src/framework/utils.cpp | 9 ++++++++ src/framework/utils.h | 1 + 12 files changed, 92 insertions(+), 16 deletions(-) diff --git a/src/framework/ALBserver.cpp b/src/framework/ALBserver.cpp index 1a08503..ac10319 100644 --- a/src/framework/ALBserver.cpp +++ b/src/framework/ALBserver.cpp @@ -26,7 +26,7 @@ namespace OpenWifi { Response.set("Connection", "keep-alive"); Response.setVersion(Poco::Net::HTTPMessage::HTTP_1_1); std::ostream &Answer = Response.send(); - Answer << "process Alive and kicking!"; + Answer << ALBHealthCheckServer()->CallbackText(); } catch (...) { } } diff --git a/src/framework/ALBserver.h b/src/framework/ALBserver.h index 6599ac8..c52a570 100644 --- a/src/framework/ALBserver.h +++ b/src/framework/ALBserver.h @@ -37,6 +37,8 @@ namespace OpenWifi { inline static std::atomic_uint64_t req_id_ = 1; }; + typedef std::string ALBHealthMessageCallback(); + class ALBHealthCheckServer : public SubSystemServer { public: ALBHealthCheckServer(); @@ -48,10 +50,22 @@ namespace OpenWifi { int Start() override; void Stop() override; + inline void RegisterExtendedHealthMessage(ALBHealthMessageCallback *F) { + Callback_=F; + }; + + inline std::string CallbackText() { + if(Callback_== nullptr) { + return "process Alive and kicking!"; + } else { + return Callback_(); + } + } private: std::unique_ptr Server_; std::unique_ptr Socket_; + ALBHealthMessageCallback *Callback_= nullptr; int Port_ = 0; mutable std::atomic_bool Running_ = false; }; diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index b8193b8..09527a5 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -213,7 +213,7 @@ namespace OpenWifi { } void KafkaProducer::Produce(const char *Topic, const std::string &Key, - const std::shared_ptr Payload) { + std::shared_ptr Payload) { std::lock_guard G(Mutex_); Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload)); } diff --git a/src/framework/KafkaManager.h b/src/framework/KafkaManager.h index b78c072..3ef5940 100644 --- a/src/framework/KafkaManager.h +++ b/src/framework/KafkaManager.h @@ -18,8 +18,8 @@ namespace OpenWifi { class KafkaMessage : public Poco::Notification { public: - KafkaMessage(const char * Topic, const std::string &Key, const std::shared_ptr Payload) - : Topic_(Topic), Key_(Key), Payload_(std::move(Payload)) {} + KafkaMessage(const char * Topic, const std::string &Key, std::shared_ptr Payload) + : Topic_(Topic), Key_(Key), Payload_(Payload) {} inline const char * Topic() { return Topic_; } inline const std::string &Key() { return Key_; } @@ -36,7 +36,7 @@ namespace OpenWifi { void run() override; void Start(); void Stop(); - void Produce(const char *Topic, const std::string &Key, const std::shared_ptr Payload); + void Produce(const char *Topic, const std::string &Key, std::shared_ptr Payload); private: std::recursive_mutex Mutex_; @@ -92,9 +92,9 @@ namespace OpenWifi { void Stop() override; void PostMessage(const char *topic, const std::string &key, - const std::shared_ptr PayLoad, bool WrapMessage = true); - void Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr Payload); - [[nodiscard]] const std::shared_ptr WrapSystemId(const std::shared_ptr PayLoad); + std::shared_ptr PayLoad, bool WrapMessage = true); + void Dispatch(const char *Topic, const std::string &Key, std::shared_ptr Payload); + [[nodiscard]] const std::shared_ptr WrapSystemId(std::shared_ptr PayLoad); [[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; } uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id); diff --git a/src/framework/MicroService.cpp b/src/framework/MicroService.cpp index 31d6837..5d517d9 100644 --- a/src/framework/MicroService.cpp +++ b/src/framework/MicroService.cpp @@ -129,6 +129,8 @@ namespace OpenWifi { } } else { poco_error(logger(), "Bad bus message."); + std::ostringstream os; + Object->stringify(std::cout); } auto i = Services_.begin(); diff --git a/src/framework/MicroServiceFuncs.cpp b/src/framework/MicroServiceFuncs.cpp index 367d464..6664981 100644 --- a/src/framework/MicroServiceFuncs.cpp +++ b/src/framework/MicroServiceFuncs.cpp @@ -5,6 +5,8 @@ #include "framework/MicroServiceFuncs.h" #include "framework/MicroService.h" +#include "framework/ALBserver.h" + namespace OpenWifi { const std::string &MicroServiceDataDirectory() { return MicroService::instance().DataDir(); } @@ -123,4 +125,8 @@ namespace OpenWifi { return MicroService::instance().AllowExternalMicroServices(); } + void MicroServiceALBCallback( std::string Callback()) { + return ALBHealthCheckServer()->RegisterExtendedHealthMessage(Callback); + } + } // namespace OpenWifi diff --git a/src/framework/MicroServiceFuncs.h b/src/framework/MicroServiceFuncs.h index b266156..9c618b8 100644 --- a/src/framework/MicroServiceFuncs.h +++ b/src/framework/MicroServiceFuncs.h @@ -53,4 +53,5 @@ namespace OpenWifi { std::string MicroServiceGetPublicAPIEndPoint(); void MicroServiceDeleteOverrideConfiguration(); bool AllowExternalMicroServices(); + void MicroServiceALBCallback( std::string Callback()); } // namespace OpenWifi diff --git a/src/framework/OpenWifiTypes.h b/src/framework/OpenWifiTypes.h index 7be530d..39f90ac 100644 --- a/src/framework/OpenWifiTypes.h +++ b/src/framework/OpenWifiTypes.h @@ -28,6 +28,9 @@ namespace OpenWifi::Types { typedef std::string UUID_t; typedef std::vector UUIDvec_t; typedef std::map> Counted3DMapSII; + typedef std::vector IntList; + typedef std::vector UIntList; + typedef std::vector DoubleList; struct MicroServiceMeta { uint64_t Id = 0; diff --git a/src/framework/RESTAPI_utils.h b/src/framework/RESTAPI_utils.h index 72c0afc..27651be 100644 --- a/src/framework/RESTAPI_utils.h +++ b/src/framework/RESTAPI_utils.h @@ -102,6 +102,20 @@ namespace OpenWifi::RESTAPI_utils { Obj.set(Field, A); } + inline void field_to_json(Poco::JSON::Object &Obj, const char *Field, const Types::DoubleList &V) { + Poco::JSON::Array A; + for (const auto &i : V) + A.add(i); + Obj.set(Field, A); + } + + inline void field_to_json(Poco::JSON::Object &Obj, const char *Field, const Types::IntList &V) { + Poco::JSON::Array A; + for (const auto &i : V) + A.add(i); + Obj.set(Field, A); + } + inline void field_to_json(Poco::JSON::Object &Obj, const char *Field, const Types::TagList &V) { Poco::JSON::Array A; for (const auto &i : V) @@ -284,6 +298,28 @@ namespace OpenWifi::RESTAPI_utils { } } + inline void field_from_json(const Poco::JSON::Object::Ptr &Obj, const char *Field, + Types::DoubleList &Value) { + if (Obj->isArray(Field) && !Obj->isNull(Field)) { + Value.clear(); + Poco::JSON::Array::Ptr A = Obj->getArray(Field); + for (const auto &i : *A) { + Value.push_back(i); + } + } + } + + inline void field_from_json(const Poco::JSON::Object::Ptr &Obj, const char *Field, + Types::IntList &Value) { + if (Obj->isArray(Field) && !Obj->isNull(Field)) { + Value.clear(); + Poco::JSON::Array::Ptr A = Obj->getArray(Field); + for (const auto &i : *A) { + Value.push_back(i); + } + } + } + template void field_from_json(const Poco::JSON::Object::Ptr &Obj, const char *Field, std::vector &Value) { diff --git a/src/framework/StorageClass.h b/src/framework/StorageClass.h index 2255de5..cfb5530 100644 --- a/src/framework/StorageClass.h +++ b/src/framework/StorageClass.h @@ -22,9 +22,8 @@ namespace OpenWifi { class StorageClass : public SubSystemServer { public: - StorageClass() noexcept : SubSystemServer("StorageClass", "STORAGE-SVR", "storage") {} - int Start() override { + inline int Start() override { std::lock_guard Guard(Mutex_); Logger().notice("Starting."); @@ -40,17 +39,22 @@ namespace OpenWifi { return 0; } - void Stop() override { Pool_->shutdown(); } + inline void Stop() override { Pool_->shutdown(); } DBType Type() const { return dbType_; }; + StorageClass() noexcept : SubSystemServer("StorageClass", "STORAGE-SVR", "storage") { + + } + private: inline int Setup_SQLite(); inline int Setup_MySQL(); inline int Setup_PostgreSQL(); - protected: - std::unique_ptr Pool_; + + protected: + std::shared_ptr Pool_; Poco::Data::SQLite::Connector SQLiteConn_; Poco::Data::PostgreSQL::Connector PostgresConn_; Poco::Data::MySQL::Connector MySQLConn_; @@ -81,7 +85,7 @@ namespace OpenWifi { // Poco::Data::SessionPool(SQLiteConn_.name(), DBName, 8, // (int)NumSessions, // (int)IdleTime)); - Pool_ = std::make_unique(SQLiteConn_.name(), DBName, 8, + Pool_ = std::make_shared(SQLiteConn_.name(), DBName, 8, (int)NumSessions, (int)IdleTime); return 0; } @@ -102,7 +106,7 @@ namespace OpenWifi { ";compress=true;auto-reconnect=true"; Poco::Data::MySQL::Connector::registerConnector(); - Pool_ = std::make_unique(MySQLConn_.name(), ConnectionStr, 8, + Pool_ = std::make_shared(MySQLConn_.name(), ConnectionStr, 8, NumSessions, IdleTime); return 0; @@ -126,7 +130,7 @@ namespace OpenWifi { " connect_timeout=" + ConnectionTimeout; Poco::Data::PostgreSQL::Connector::registerConnector(); - Pool_ = std::make_unique(PostgresConn_.name(), ConnectionStr, 8, + Pool_ = std::make_shared(PostgresConn_.name(), ConnectionStr, 8, NumSessions, IdleTime); return 0; diff --git a/src/framework/utils.cpp b/src/framework/utils.cpp index c2f4358..a0f4c11 100644 --- a/src/framework/utils.cpp +++ b/src/framework/utils.cpp @@ -132,6 +132,15 @@ namespace OpenWifi::Utils { return std::regex_match(Hostname, HostNameRegex); } + [[nodiscard]] bool ValidNumber(const std::string &number, bool isSigned) + { + static std::regex IntRegex("^-?[0-9]\\d*(\\.\\d+)?$"); + if(!isSigned) { + IntRegex = "^[0-9]\\d*(\\.\\d+)?$"; + } + return std::regex_match(number, IntRegex); + } + [[nodiscard]] std::string ToHex(const std::vector &B) { std::string R; R.reserve(B.size() * 2); diff --git a/src/framework/utils.h b/src/framework/utils.h index b464989..3979dca 100644 --- a/src/framework/utils.h +++ b/src/framework/utils.h @@ -73,6 +73,7 @@ namespace OpenWifi::Utils { [[nodiscard]] bool ValidSerialNumbers(const std::vector &Serial); [[nodiscard]] bool ValidUUID(const std::string &UUID); [[nodiscard]] bool ValidHostname(const std::string &hostname); + [[nodiscard]] bool ValidNumber(const std::string &number, bool isSigned); template std::string ComputeHash(Args &&...args) { Poco::SHA2Engine E; From a5ea86f0dbe221a8dab102b595e776dd828423cf Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Fri, 1 Sep 2023 09:25:00 -0700 Subject: [PATCH 02/11] https://telecominfraproject.atlassian.net/browse/WIFI-12868 Signed-off-by: stephb9959 --- src/framework/RESTAPI_Handler.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/framework/RESTAPI_Handler.h b/src/framework/RESTAPI_Handler.h index a7ed681..b74d6e7 100644 --- a/src/framework/RESTAPI_Handler.h +++ b/src/framework/RESTAPI_Handler.h @@ -574,6 +574,16 @@ namespace OpenWifi { Poco::JSON::Stringifier::stringify(Object, Answer); } + inline void ReturnObject(const std::vector &Strings) { + Poco::JSON::Array Arr; + for(const auto &String:Strings) { + Arr.add(String); + } + std::ostringstream os; + Arr.stringify(os); + return ReturnRawJSON(os.str()); + } + inline void ReturnRawJSON(const std::string &json_doc) { PrepareResponse(); if (Request != nullptr) { From d728948ece6db3bf2cc73f3bee3751598215ce26 Mon Sep 17 00:00:00 2001 From: Adam Capparelli Date: Mon, 11 Sep 2023 14:22:19 -0400 Subject: [PATCH 03/11] WIFI-12937: update building.md dependencies to match Dockerfile. Signed-off-by: Adam Capparelli --- BUILDING.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/BUILDING.md b/BUILDING.md index 4895b2c..843c923 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -16,15 +16,15 @@ from the master copy needed for cmake. Please use the version of this [Poco fix] Poco may take several minutes depending on the platform you are building on. ## Ubuntu -These instructions have proven to work on Ubuntu 20.4. +These instructions have proven to work on Ubuntu 23.04. ```bash -sudo apt install git cmake g++ libssl-dev libmariadb-dev +sudo apt install git cmake g++ libssl-dev libmariadb-dev libmariadbclient-dev-compat sudo apt install libpq-dev libaprutil1-dev apache2-dev libboost-all-dev -sudo apt install librdkafka-dev default-libmysqlclient-dev -sudo apt install nlohmann-json-dev +sudo apt install librdkafka-dev +sudo apt install zlib1g-dev nlohmann-json3-dev ca-certificates libcurl4-openssl-dev cd ~ -git clone https://github.com/AriliaWireless/poco --branch poco-tip-v1 +git clone https://github.com/AriliaWireless/poco --branch poco-tip-v2 cd poco mkdir cmake-build cd cmake-build From 55658f79c672e2814a5a5954165d3c57995e18f5 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Wed, 13 Sep 2023 12:21:25 -0700 Subject: [PATCH 04/11] https://telecominfraproject.atlassian.net/browse/WIFI-7831 Signed-off-by: stephb9959 --- src/framework/EventBusManager.cpp | 6 +- src/framework/KafkaManager.cpp | 25 +++-- src/framework/KafkaManager.h | 21 ++-- src/framework/RESTAPI_Handler.h | 22 ++++- src/framework/SubSystemServer.cpp | 6 +- src/framework/ow_constants.h | 12 ++- src/framework/utils.cpp | 153 +++++++++++++++++++++++++++++- src/framework/utils.h | 13 +++ 8 files changed, 231 insertions(+), 27 deletions(-) diff --git a/src/framework/EventBusManager.cpp b/src/framework/EventBusManager.cpp index 28d8037..ca28ad9 100644 --- a/src/framework/EventBusManager.cpp +++ b/src/framework/EventBusManager.cpp @@ -14,18 +14,18 @@ namespace OpenWifi { void EventBusManager::run() { Running_ = true; Utils::SetThreadName("fmwk:EventMgr"); - auto Msg = std::make_shared(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN)); + auto Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN)); KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg, false); while (Running_) { Poco::Thread::trySleep((unsigned long)MicroServiceDaemonBusTimer()); if (!Running_) break; - Msg = std::make_shared(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE)); + Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE)); KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg, false); } - Msg = std::make_shared(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE)); + Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE)); KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg, false); }; diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index 09527a5..d32833e 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -180,7 +180,7 @@ namespace OpenWifi { Consumer.async_commit(Msg); continue; } - KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), std::make_shared(Msg.get_payload())); + KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload()); if (!AutoCommit) Consumer.async_commit(Msg); } @@ -213,7 +213,7 @@ namespace OpenWifi { } void KafkaProducer::Produce(const char *Topic, const std::string &Key, - std::shared_ptr Payload) { + const std::string &Payload) { std::lock_guard G(Mutex_); Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload)); } @@ -276,7 +276,7 @@ namespace OpenWifi { } void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key, - const std::shared_ptr Payload) { + const std::string & Payload) { std::lock_guard G(Mutex_); auto It = Notifiers_.find(Topic); if (It != Notifiers_.end()) { @@ -333,20 +333,29 @@ namespace OpenWifi { } void KafkaManager::PostMessage(const char *topic, const std::string &key, - const std::shared_ptr PayLoad, bool WrapMessage) { + const std::string & PayLoad, bool WrapMessage) { if (KafkaEnabled_) { ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(PayLoad) : PayLoad); } } + void KafkaManager::PostMessage(const char *topic, const std::string &key, + const Poco::JSON::Object &Object, bool WrapMessage) { + if (KafkaEnabled_) { + std::ostringstream ObjectStr; + Object.stringify(ObjectStr); + ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(ObjectStr.str()) : ObjectStr.str()); + } + } + + void KafkaManager::Dispatch(const char *Topic, const std::string &Key, - const std::shared_ptr Payload) { + const std::string &Payload) { Dispatcher_.Dispatch(Topic, Key, Payload); } - [[nodiscard]] const std::shared_ptr KafkaManager::WrapSystemId(const std::shared_ptr PayLoad) { - *PayLoad = SystemInfoWrapper_ + *PayLoad + "}"; - return PayLoad; + [[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) { + return SystemInfoWrapper_ + PayLoad + "}"; } uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, diff --git a/src/framework/KafkaManager.h b/src/framework/KafkaManager.h index 3ef5940..31cf093 100644 --- a/src/framework/KafkaManager.h +++ b/src/framework/KafkaManager.h @@ -6,7 +6,7 @@ #include "Poco/Notification.h" #include "Poco/NotificationQueue.h" - +#include "Poco/JSON/Object.h" #include "framework/KafkaTopics.h" #include "framework/OpenWifiTypes.h" #include "framework/SubSystemServer.h" @@ -18,17 +18,17 @@ namespace OpenWifi { class KafkaMessage : public Poco::Notification { public: - KafkaMessage(const char * Topic, const std::string &Key, std::shared_ptr Payload) + KafkaMessage(const char * Topic, const std::string &Key, const std::string &Payload) : Topic_(Topic), Key_(Key), Payload_(Payload) {} inline const char * Topic() { return Topic_; } inline const std::string &Key() { return Key_; } - inline const std::string &Payload() { return *Payload_; } + inline const std::string &Payload() { return Payload_; } private: const char *Topic_; std::string Key_; - std::shared_ptr Payload_; + std::string Payload_; }; class KafkaProducer : public Poco::Runnable { @@ -36,7 +36,7 @@ namespace OpenWifi { void run() override; void Start(); void Stop(); - void Produce(const char *Topic, const std::string &Key, std::shared_ptr Payload); + void Produce(const char *Topic, const std::string &Key, const std::string & Payload); private: std::recursive_mutex Mutex_; @@ -63,7 +63,7 @@ namespace OpenWifi { void Stop(); auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); void UnregisterTopicWatcher(const std::string &Topic, int Id); - void Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr Payload); + void Dispatch(const char *Topic, const std::string &Key, const std::string & Payload); void run() override; void Topics(std::vector &T); @@ -92,9 +92,12 @@ namespace OpenWifi { void Stop() override; void PostMessage(const char *topic, const std::string &key, - std::shared_ptr PayLoad, bool WrapMessage = true); - void Dispatch(const char *Topic, const std::string &Key, std::shared_ptr Payload); - [[nodiscard]] const std::shared_ptr WrapSystemId(std::shared_ptr PayLoad); + const std::string &PayLoad, bool WrapMessage = true); + void PostMessage(const char *topic, const std::string &key, + const Poco::JSON::Object &Object, bool WrapMessage = true); + + void Dispatch(const char *Topic, const std::string &Key, const std::string &Payload); + [[nodiscard]] std::string WrapSystemId(const std::string & PayLoad); [[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; } uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id); diff --git a/src/framework/RESTAPI_Handler.h b/src/framework/RESTAPI_Handler.h index b74d6e7..d6f31be 100644 --- a/src/framework/RESTAPI_Handler.h +++ b/src/framework/RESTAPI_Handler.h @@ -584,7 +584,27 @@ namespace OpenWifi { return ReturnRawJSON(os.str()); } - inline void ReturnRawJSON(const std::string &json_doc) { + template void ReturnObject(const std::vector &Objects) { + Poco::JSON::Array Arr; + for(const auto &Object:Objects) { + Poco::JSON::Object O; + Object.to_json(O); + Arr.add(O); + } + std::ostringstream os; + Arr.stringify(os); + return ReturnRawJSON(os.str()); + } + + template void ReturnObject(const T &Object) { + Poco::JSON::Object O; + Object.to_json(O); + std::ostringstream os; + O.stringify(os); + return ReturnRawJSON(os.str()); + } + + inline void ReturnRawJSON(const std::string &json_doc) { PrepareResponse(); if (Request != nullptr) { // can we compress ??? diff --git a/src/framework/SubSystemServer.cpp b/src/framework/SubSystemServer.cpp index f6df356..db4dc14 100644 --- a/src/framework/SubSystemServer.cpp +++ b/src/framework/SubSystemServer.cpp @@ -53,7 +53,6 @@ namespace OpenWifi { Context->useCertificate(Cert); Context->addChainCertificate(Root); - Context->addCertificateAuthority(Root); if (level_ == Poco::Net::Context::VERIFY_STRICT) { @@ -76,8 +75,7 @@ namespace OpenWifi { L.fatal(fmt::format("Wrong Certificate({}) for Key({})", cert_file_, key_file_)); } - SSL_CTX_set_verify(SSLCtx, SSL_VERIFY_PEER, nullptr); - + SSL_CTX_set_verify(SSLCtx, SSL_VERIFY_PEER, nullptr); if (level_ == Poco::Net::Context::VERIFY_STRICT) { SSL_CTX_set_client_CA_list(SSLCtx, SSL_load_client_CA_file(client_cas_.c_str())); } @@ -87,7 +85,7 @@ namespace OpenWifi { Context->enableSessionCache(); Context->setSessionCacheSize(0); Context->setSessionTimeout(60); - Context->enableExtendedCertificateVerification(true); + Context->enableExtendedCertificateVerification( level_!= Poco::Net::Context::VERIFY_NONE ); Context->disableStatelessSessionResumption(); } diff --git a/src/framework/ow_constants.h b/src/framework/ow_constants.h index c1a1bf7..65ed7a3 100644 --- a/src/framework/ow_constants.h +++ b/src/framework/ow_constants.h @@ -40,6 +40,7 @@ namespace OpenWifi { }; } +#define DBGLINE std::cout << __LINE__ << ":" << __FILE__ << ", " << __func__ << std::endl; namespace OpenWifi::RESTAPI::Errors { struct msg { uint64_t err_num; @@ -405,7 +406,16 @@ namespace OpenWifi::RESTAPI::Errors { 1172, "The venue name already exists." }; - static const struct msg DefFirmwareNameExists { 1172, "Firmware name already exists." }; + static const struct msg InvalidGlobalReachAccount { + 1173, "Invalid Global Reach account information." + }; + static const struct msg CannotCreateCSR { + 1174, "Cannot create a CSR certificate." + }; + + static const struct msg DefFirmwareNameExists { 1175, "Firmware name already exists." }; + + static const struct msg NotAValidECKey { 1176, "Not a valid Signing Key." }; static const struct msg SimulationDoesNotExist { 7000, "Simulation Instance ID does not exist." diff --git a/src/framework/utils.cpp b/src/framework/utils.cpp index a0f4c11..b4486ba 100644 --- a/src/framework/utils.cpp +++ b/src/framework/utils.cpp @@ -3,7 +3,8 @@ // #include "Poco/Path.h" - +#include "Poco/TemporaryFile.h" +#include "Poco/Crypto/ECKey.h" #include "framework/AppServiceRegistry.h" #include "framework/utils.h" @@ -608,4 +609,154 @@ namespace OpenWifi::Utils { return DT.timestamp().epochTime(); } + static std::string FileToString(const std::string &Filename) { + std::ifstream ifs(Filename.c_str(),std::ios_base::in|std::ios_base::binary); + std::ostringstream os; + Poco::StreamCopier::copyStream(ifs,os); + return os.str(); + } + + bool CreateX509CSR(const CSRCreationParameters & Parameters, CSRCreationResults & Results) { + int ret = 0; + RSA *r = nullptr; + BIGNUM *bne = nullptr; + + int nVersion = 0; + unsigned long e = RSA_F4; + + X509_REQ *x509_req = nullptr; + X509_NAME *x509_name = nullptr; + EVP_PKEY *pKey = nullptr; +// RSA *tem = nullptr; +// BIO *bio_err = nullptr; + + const char *szCountry = Parameters.Country.c_str(); + const char *szProvince = Parameters.Province.c_str(); + const char *szCity = Parameters.City.c_str(); + const char *szOrganization = Parameters.Organization.c_str(); + const char *szCommon = Parameters.CommonName.c_str(); + + Poco::TemporaryFile CsrPath, PubKey, PrivateKey; + std::string Result; + std::ifstream ifs; + std::ostringstream ss; + BIO *bp_public = nullptr, + *bp_private = nullptr, + *bp_csr = nullptr; + + // 1. generate rsa key + bne = BN_new(); + ret = BN_set_word(bne,e); + if(ret != 1){ + goto free_all; + } + + r = RSA_new(); + ret = RSA_generate_key_ex(r, Parameters.bits, bne, nullptr); + if(ret != 1){ + goto free_all; + } + + bp_public = BIO_new_file(PubKey.path().c_str(), "w+"); + ret = PEM_write_bio_RSAPublicKey(bp_public, r); + if(ret != 1) { + goto free_all; + } + + bp_private = BIO_new_file(PrivateKey.path().c_str(), "w+"); + ret = PEM_write_bio_RSAPrivateKey(bp_private, r, NULL, NULL, 0, NULL, NULL); + if(ret != 1) { + goto free_all; + } + +// 2. set version of x509 req + x509_req = X509_REQ_new(); + ret = X509_REQ_set_version(x509_req, nVersion); + if (ret != 1){ + goto free_all; + } + +// 3. set subject of x509 req + x509_name = X509_REQ_get_subject_name(x509_req); + + ret = X509_NAME_add_entry_by_txt(x509_name,"C", MBSTRING_ASC, (const unsigned char*)szCountry, -1, -1, 0); + if (ret != 1){ + goto free_all; + } + + ret = X509_NAME_add_entry_by_txt(x509_name,"ST", MBSTRING_ASC, (const unsigned char*)szProvince, -1, -1, 0); + if (ret != 1){ + goto free_all; + } + + ret = X509_NAME_add_entry_by_txt(x509_name,"L", MBSTRING_ASC, (const unsigned char*)szCity, -1, -1, 0); + if (ret != 1){ + goto free_all; + } + + ret = X509_NAME_add_entry_by_txt(x509_name,"O", MBSTRING_ASC, (const unsigned char*)szOrganization, -1, -1, 0); + if (ret != 1){ + goto free_all; + } + + ret = X509_NAME_add_entry_by_txt(x509_name,"CN", MBSTRING_ASC, (const unsigned char*)szCommon, -1, -1, 0); + if (ret != 1){ + goto free_all; + } + +// 4. set public key of x509 req + pKey = EVP_PKEY_new(); + EVP_PKEY_assign_RSA(pKey, r); + r = nullptr; // will be free rsa when EVP_PKEY_free(pKey) + + ret = X509_REQ_set_pubkey(x509_req, pKey); + if (ret != 1){ + goto free_all; + } + +// 5. set sign key of x509 req + ret = X509_REQ_sign(x509_req, pKey, EVP_sha1()); // return x509_req->signature->length + if (ret <= 0){ + goto free_all; + } + + bp_csr = BIO_new_file(CsrPath.path().c_str(),"w"); + ret = PEM_write_bio_X509_REQ(bp_csr, x509_req); + +// 6. free + free_all: + X509_REQ_free(x509_req); + BIO_free_all(bp_csr); + BIO_free_all(bp_public); + BIO_free_all(bp_private); + + EVP_PKEY_free(pKey); + BN_free(bne); + if(ret==1) { + Results.CSR = FileToString(CsrPath.path()); + Results.PrivateKey = FileToString(PrivateKey.path()); + Results.PublicKey = FileToString(PubKey.path()); + } + + return ret; + } + + bool VerifyECKey(const std::string &key) { + try { + Poco::TemporaryFile F; + + std::ofstream of(F.path().c_str(), std::ios_base::trunc | std::ios_base::out | std::ios_base::binary); + of << key; + of.close(); + + auto Key = Poco::SharedPtr( + new Poco::Crypto::ECKey("", F.path(),"")); + + return true; + } catch (const Poco::Exception &E) { + + } + return false; + } + } // namespace OpenWifi::Utils diff --git a/src/framework/utils.h b/src/framework/utils.h index 3979dca..9a9c939 100644 --- a/src/framework/utils.h +++ b/src/framework/utils.h @@ -247,4 +247,17 @@ namespace OpenWifi::Utils { return count; } + struct CSRCreationParameters { + std::string Country, Province, City, + Organization, CommonName; + int bits=2048; + }; + + struct CSRCreationResults { + std::string CSR, PublicKey, PrivateKey; + }; + + bool CreateX509CSR(const CSRCreationParameters & Parameters, CSRCreationResults & Results); + + bool VerifyECKey(const std::string &key); } // namespace OpenWifi::Utils From 5e630c8b99499bad6d64cf373aeb0891979888a9 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Wed, 13 Sep 2023 13:15:04 -0700 Subject: [PATCH 05/11] https://telecominfraproject.atlassian.net/browse/WIFI-7831 Signed-off-by: stephb9959 --- build | 2 +- src/framework/SubSystemServer.cpp | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/build b/build index d8263ee..e440e5c 100644 --- a/build +++ b/build @@ -1 +1 @@ -2 \ No newline at end of file +3 \ No newline at end of file diff --git a/src/framework/SubSystemServer.cpp b/src/framework/SubSystemServer.cpp index db4dc14..9dda8a3 100644 --- a/src/framework/SubSystemServer.cpp +++ b/src/framework/SubSystemServer.cpp @@ -37,6 +37,7 @@ namespace OpenWifi { P.cipherList = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; P.dhUse2048Bits = true; P.caLocation = cas_; + // P.securityLevel = auto Context = Poco::AutoPtr( new Poco::Net::Context(Poco::Net::Context::TLS_SERVER_USE, P)); @@ -75,11 +76,12 @@ namespace OpenWifi { L.fatal(fmt::format("Wrong Certificate({}) for Key({})", cert_file_, key_file_)); } - SSL_CTX_set_verify(SSLCtx, SSL_VERIFY_PEER, nullptr); + SSL_CTX_set_verify(SSLCtx, level_==Poco::Net::Context::VERIFY_NONE ? SSL_VERIFY_NONE : SSL_VERIFY_PEER, nullptr); + if (level_ == Poco::Net::Context::VERIFY_STRICT) { SSL_CTX_set_client_CA_list(SSLCtx, SSL_load_client_CA_file(client_cas_.c_str())); + SSL_CTX_enable_ct(SSLCtx, SSL_CT_VALIDATION_STRICT); } - SSL_CTX_enable_ct(SSLCtx, SSL_CT_VALIDATION_STRICT); SSL_CTX_dane_enable(SSLCtx); Context->enableSessionCache(); From 935515bb8934047f498ecf767e29bcd515afd560 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Tue, 19 Sep 2023 22:01:52 -0700 Subject: [PATCH 06/11] https://telecominfraproject.atlassian.net/browse/WIFI-7831 Signed-off-by: stephb9959 --- CMakeLists.txt | 2 +- build | 2 +- src/framework/ConfigurationValidator.cpp | 231 ++++++++++++++++++----- src/framework/ow_constants.h | 2 + src/framework/utils.cpp | 103 ++++++++++ src/framework/utils.h | 6 +- 6 files changed, 296 insertions(+), 50 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e5dab1e..4a44c1b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,7 +63,7 @@ include_directories(/usr/local/include /usr/local/opt/openssl/include src inclu configure_file(src/ow_version.h.in ${PROJECT_SOURCE_DIR}/src/ow_version.h @ONLY) -add_definitions(-DPOCO_LOG_DEBUG="1") +add_definitions(-DPOCO_LOG_DEBUG="1" -DBOOST_NO_CXX98_FUNCTION_BASE=1) add_executable(owanalytics build diff --git a/build b/build index e440e5c..bf0d87a 100644 --- a/build +++ b/build @@ -1 +1 @@ -3 \ No newline at end of file +4 \ No newline at end of file diff --git a/src/framework/ConfigurationValidator.cpp b/src/framework/ConfigurationValidator.cpp index f1bb9ef..28b6e0c 100644 --- a/src/framework/ConfigurationValidator.cpp +++ b/src/framework/ConfigurationValidator.cpp @@ -34,6 +34,10 @@ static std::string DefaultUCentralSchema = R"foo( "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { + "strict": { + "type": "boolean", + "default": false + }, "uuid": { "type": "integer" }, @@ -114,6 +118,20 @@ static std::string DefaultUCentralSchema = R"foo( "random-password": { "type": "boolean", "default": false + }, + "beacon-advertisement": { + "type": "object", + "properties": { + "device-name": { + "type": "boolean" + }, + "device-serial": { + "type": "boolean" + }, + "network-id": { + "type": "integer" + } + } } } }, @@ -222,6 +240,52 @@ static std::string DefaultUCentralSchema = R"foo( } } }, + "interface.ssid.encryption": { + "type": "object", + "properties": { + "proto": { + "type": "string", + "enum": [ + "none", + "owe", + "owe-transition", + "psk", + "psk2", + "psk-mixed", + "psk2-radius", + "wpa", + "wpa2", + "wpa-mixed", + "sae", + "sae-mixed", + "wpa3", + "wpa3-192", + "wpa3-mixed" + ], + "examples": [ + "psk2" + ] + }, + "key": { + "type": "string", + "maxLength": 63, + "minLength": 8 + }, + "ieee80211w": { + "type": "string", + "enum": [ + "disabled", + "optional", + "required" + ], + "default": "disabled" + }, + "key-caching": { + "type": "boolean", + "default": true + } + } + }, "definitions": { "type": "object", "properties": { @@ -716,7 +780,8 @@ static std::string DefaultUCentralSchema = R"foo( "type": "string", "enum": [ "dynamic", - "static" + "static", + "none" ], "examples": [ "static" @@ -1006,52 +1071,6 @@ static std::string DefaultUCentralSchema = R"foo( } ] }, - "interface.ssid.encryption": { - "type": "object", - "properties": { - "proto": { - "type": "string", - "enum": [ - "none", - "owe", - "owe-transition", - "psk", - "psk2", - "psk-mixed", - "psk2-radius", - "wpa", - "wpa2", - "wpa-mixed", - "sae", - "sae-mixed", - "wpa3", - "wpa3-192", - "wpa3-mixed" - ], - "examples": [ - "psk2" - ] - }, - "key": { - "type": "string", - "maxLength": 63, - "minLength": 8 - }, - "ieee80211w": { - "type": "string", - "enum": [ - "disabled", - "optional", - "required" - ], - "default": "disabled" - }, - "key-caching": { - "type": "boolean", - "default": true - } - } - }, "interface.ssid.multi-psk": { "type": "object", "properties": { @@ -2020,6 +2039,11 @@ static std::string DefaultUCentralSchema = R"foo( "decription": "This option allows embedding custom vendor specific IEs inside the beacons of a BSS in AP mode.", "type": "string" }, + "tip-information-element": { + "decription": "The device will broadcast the TIP vendor IE inside its beacons if this option is enabled.", + "type": "boolean", + "default": true + }, "fils-discovery-interval": { "type": "integer", "default": 20, @@ -2443,6 +2467,24 @@ static std::string DefaultUCentralSchema = R"foo( "type": "boolean", "default": false }, + "mode": { + "type": "string", + "enum": [ + "radius", + "user" + ] + }, + "port-filter": { + "type": "array", + "items": { + "type": "string", + "examples": [ + { + "LAN1": null + } + ] + } + }, "server-certificate": { "type": "string" }, @@ -2454,6 +2496,77 @@ static std::string DefaultUCentralSchema = R"foo( "items": { "$ref": "#/$defs/interface.ssid.radius.local-user" } + }, + "radius": { + "type": "object", + "properties": { + "nas-identifier": { + "type": "string" + }, + "auth-server-addr": { + "type": "string", + "format": "uc-host", + "examples": [ + "192.168.1.10" + ] + }, + "auth-server-port": { + "type": "integer", + "maximum": 65535, + "minimum": 1024, + "examples": [ + 1812 + ] + }, + "auth-server-secret": { + "type": "string", + "examples": [ + "secret" + ] + }, + "acct-server-addr": { + "type": "string", + "format": "uc-host", + "examples": [ + "192.168.1.10" + ] + }, + "acct-server-port": { + "type": "integer", + "maximum": 65535, + "minimum": 1024, + "examples": [ + 1813 + ] + }, + "acct-server-secret": { + "type": "string", + "examples": [ + "secret" + ] + }, + "coa-server-addr": { + "type": "string", + "format": "uc-host", + "examples": [ + "192.168.1.10" + ] + }, + "coa-server-port": { + "type": "integer", + "maximum": 65535, + "minimum": 1024, + "examples": [ + 1814 + ] + }, + "coa-server-secret": { + "type": "string", + "examples": [ + "secret" + ] + } + } } } }, @@ -2777,6 +2890,12 @@ static std::string DefaultUCentralSchema = R"foo( } } }, + "services": { + "type": "array", + "items": { + "type": "string" + } + }, "classifier": { "type": "array", "items": { @@ -3019,6 +3138,24 @@ static std::string DefaultUCentralSchema = R"foo( "relay-server": { "type": "string", "format": "uc-ip" + }, + "circuit-id-format": { + "type": "string", + "enum": [ + "vlan-id", + "ap-mac", + "ssid" + ], + "default": "vlan-id" + }, + "remote-id-format": { + "type": "string", + "enum": [ + "vlan-id", + "ap-mac", + "ssid" + ], + "default": "ap-mac" } } } diff --git a/src/framework/ow_constants.h b/src/framework/ow_constants.h index 65ed7a3..370ab40 100644 --- a/src/framework/ow_constants.h +++ b/src/framework/ow_constants.h @@ -417,6 +417,8 @@ namespace OpenWifi::RESTAPI::Errors { static const struct msg NotAValidECKey { 1176, "Not a valid Signing Key." }; + static const struct msg NotAValidRadiusPoolType { 1177, "Not a valid RADIUS pool type." }; + static const struct msg SimulationDoesNotExist { 7000, "Simulation Instance ID does not exist." }; diff --git a/src/framework/utils.cpp b/src/framework/utils.cpp index b4486ba..6c34cdb 100644 --- a/src/framework/utils.cpp +++ b/src/framework/utils.cpp @@ -8,6 +8,12 @@ #include "framework/AppServiceRegistry.h" #include "framework/utils.h" +#include +#include +#include +#include +#include + namespace OpenWifi::Utils { bool NormalizeMac(std::string &Mac) { @@ -759,4 +765,101 @@ namespace OpenWifi::Utils { return false; } + bool VerifyRSAKey([[ + maybe_unused]] const std::string &key) { + try { + Poco::TemporaryFile F; + + std::ofstream of(F.path().c_str(), std::ios_base::trunc | std::ios_base::out | std::ios_base::binary); + of << key; + of.close(); + + auto Key = Poco::SharedPtr( + new Poco::Crypto::RSAKey("", F.path(),"")); + return true; + } catch (const Poco::Exception &E) { + + } + return false; + } + + bool ValidX509Certificate([[ + maybe_unused]] const std::string &Cert) { + try { + Poco::TemporaryFile F; + std::ofstream of(F.path().c_str(), std::ios_base::trunc | std::ios_base::out | std::ios_base::binary); + of << Cert; + of.close(); + + auto Key = Poco::SharedPtr( + new Poco::Crypto::X509Certificate(F.path())); + return true; + } catch (const Poco::Exception &E) { + + } + return false; + } + + bool ValidX509Certificate([[ + maybe_unused]] const std::vector &Certs) { + auto F = [](const std::string &C) -> bool { return ValidX509Certificate(C); }; + return std::all_of(Certs.begin(),Certs.end(), F); + } + + std::string generateStrongPassword(int minLength, int maxLength, int numDigits, int minLowercase, int minSpecial, int minUppercase) { + // Define character sets for each category + const std::string lowercaseChars = "abcdefghijklmnopqrstuvwxyz"; + const std::string uppercaseChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + const std::string digitChars = "0123456789"; + const std::string specialChars = "!@#$%^&*()_+[]{}|;:,.<>?"; + + // Check if parameters are valid + if (minLength < 1 || minLength > maxLength || minLowercase + minUppercase + numDigits + minSpecial > maxLength) { + return "Invalid parameters"; + } + + // Initialize random seed + std::random_device rd; + std::mt19937 g(rd()); + + // Initialize the password string + std::string password; + + // Generate the required number of each character type + for (int i = 0; i < minLowercase; ++i) { + password += lowercaseChars[g() % lowercaseChars.length()]; + } + for (int i = 0; i < minUppercase; ++i) { + password += uppercaseChars[g() % uppercaseChars.length()]; + } + for (int i = 0; i < numDigits; ++i) { + password += digitChars[g() % digitChars.length()]; + } + for (int i = 0; i < minSpecial; ++i) { + password += specialChars[g() % specialChars.length()]; + } + + // Calculate how many more characters are needed + int remainingLength = maxLength - (int)password.length(); + + // Generate random characters to fill the remaining length + for (int i = 0; i < remainingLength; ++i) { + int category = g() % 4; // Randomly select a category + if (category == 0) { + password += lowercaseChars[g() % lowercaseChars.length()]; + } else if (category == 1) { + password += uppercaseChars[g() % uppercaseChars.length()]; + } else if (category == 2) { + password += digitChars[g() % digitChars.length()]; + } else { + password += specialChars[g() % specialChars.length()]; + } + } + + // Shuffle the password to randomize the character order + std::shuffle(password.begin(), password.end(),g); + + return password; + } + } // namespace OpenWifi::Utils diff --git a/src/framework/utils.h b/src/framework/utils.h index 9a9c939..cf708bd 100644 --- a/src/framework/utils.h +++ b/src/framework/utils.h @@ -258,6 +258,10 @@ namespace OpenWifi::Utils { }; bool CreateX509CSR(const CSRCreationParameters & Parameters, CSRCreationResults & Results); - + std::string generateStrongPassword(int minLength, int maxLength, int numDigits, int minLowercase, int minSpecial, int minUppercase); bool VerifyECKey(const std::string &key); + bool VerifyRSAKey(const std::string &key); + bool ValidX509Certificate(const std::string &Cert); + bool ValidX509Certificate(const std::vector &Certs); + } // namespace OpenWifi::Utils From 70f8128504c5cfcc73919b719486698b1c713f4a Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sat, 23 Sep 2023 15:27:15 -0700 Subject: [PATCH 07/11] https://telecominfraproject.atlassian.net/browse/WIFI-12954 Signed-off-by: stephb9959 --- build | 2 +- src/framework/KafkaManager.cpp | 26 +++++++++++++++++--------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/build b/build index bf0d87a..7813681 100644 --- a/build +++ b/build @@ -1 +1 @@ -4 \ No newline at end of file +5 \ No newline at end of file diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index d32833e..d903652 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -99,9 +99,12 @@ namespace OpenWifi { try { auto Msg = dynamic_cast(Note.get()); if (Msg != nullptr) { - Producer.produce(cppkafka::MessageBuilder(Msg->Topic()) - .key(Msg->Key()) - .payload(Msg->Payload())); + auto NewMessage = cppkafka::MessageBuilder(Msg->Topic()); + NewMessage.key(Msg->Key()); + NewMessage.partition(0); + NewMessage.payload(Msg->Payload()); + Producer.produce(NewMessage); + Producer.flush(); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, @@ -157,17 +160,19 @@ namespace OpenWifi { }); bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false); - auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 20); + auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100); Types::StringVec Topics; KafkaManager()->Topics(Topics); Consumer.subscribe(Topics); Running_ = true; + std::vector MsgVec; while (Running_) { try { - std::vector MsgVec = - Consumer.poll_batch(BatchSize, std::chrono::milliseconds(100)); + MsgVec.clear(); + MsgVec.reserve(BatchSize); + MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000)); for (auto const &Msg : MsgVec) { if (!Msg) continue; @@ -177,12 +182,12 @@ namespace OpenWifi { fmt::format("Error: {}", Msg.get_error().to_string())); } if (!AutoCommit) - Consumer.async_commit(Msg); + Consumer.commit(Msg); continue; } KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload()); if (!AutoCommit) - Consumer.async_commit(Msg); + Consumer.commit(Msg); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, @@ -355,7 +360,10 @@ namespace OpenWifi { } [[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) { - return SystemInfoWrapper_ + PayLoad + "}"; + return fmt::format( R"lit({{ "system" : {{ "id" : {}, + "host" : "{}" }}, + "payload" : {} }})lit", MicroServiceID(), + MicroServicePrivateEndPoint(), PayLoad ) ; } uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, From 58949f50f45be545d098759d11588a862594bfe4 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 24 Sep 2023 10:59:08 -0700 Subject: [PATCH 08/11] https://telecominfraproject.atlassian.net/browse/WIFI-12954 Signed-off-by: stephb9959 --- build | 2 +- src/framework/KafkaManager.cpp | 154 +++++++++------------------------ src/framework/KafkaManager.h | 42 ++++----- 3 files changed, 59 insertions(+), 139 deletions(-) diff --git a/build b/build index 7813681..62f9457 100644 --- a/build +++ b/build @@ -1 +1 @@ -5 \ No newline at end of file +6 \ No newline at end of file diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index d903652..9381065 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -6,6 +6,7 @@ #include "fmt/format.h" #include "framework/MicroServiceFuncs.h" +#include "cppkafka/utils/consumer_dispatcher.h" namespace OpenWifi { @@ -159,45 +160,49 @@ namespace OpenWifi { } }); - bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false); - auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100); + // bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false); + // auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100); Types::StringVec Topics; - KafkaManager()->Topics(Topics); + std::for_each(Topics_.begin(),Topics_.end(), + [&](const std::string & T) { Topics.emplace_back(T); }); Consumer.subscribe(Topics); Running_ = true; std::vector MsgVec; - while (Running_) { - try { - MsgVec.clear(); - MsgVec.reserve(BatchSize); - MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000)); - for (auto const &Msg : MsgVec) { - if (!Msg) - continue; - if (Msg.get_error()) { - if (!Msg.is_eof()) { - poco_error(Logger_, - fmt::format("Error: {}", Msg.get_error().to_string())); + + Dispatcher_ = std::make_unique(Consumer); + + Dispatcher_->run( + // Callback executed whenever a new message is consumed + [&](cppkafka::Message msg) { + // Print the key (if any) + std::lock_guard G(ConsumerMutex_); + auto It = Notifiers_.find(msg.get_topic()); + if (It != Notifiers_.end()) { + const auto &FL = It->second; + for (const auto &[CallbackFunc, _] : FL) { + try { + CallbackFunc(msg.get_key(), msg.get_payload()); + } catch(const Poco::Exception &E) { + + } catch(...) { + } - if (!AutoCommit) - Consumer.commit(Msg); - continue; } - KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload()); - if (!AutoCommit) - Consumer.commit(Msg); } - } catch (const cppkafka::HandleException &E) { - poco_warning(Logger_, - fmt::format("Caught a Kafka exception (consumer): {}", E.what())); - } catch (const Poco::Exception &E) { - Logger_.log(E); - } catch (...) { - poco_error(Logger_, "std::exception"); + Consumer.commit(msg); + }, + // Whenever there's an error (other than the EOF soft error) + [&Logger_](cppkafka::Error error) { + poco_warning(Logger_,fmt::format("Error: {}", error.to_string())); + }, + // Whenever EOF is reached on a partition, print this + [&Logger_](cppkafka::ConsumerDispatcher::EndOfFile, const cppkafka::TopicPartition& topic_partition) { + poco_debug(Logger_,fmt::format("Partition {} EOF", topic_partition.get_partition())); } - } + ); + Consumer.unsubscribe(); poco_information(Logger_, "Stopped..."); } @@ -225,7 +230,6 @@ namespace OpenWifi { void KafkaConsumer::Start() { if (!Running_) { - Running_ = true; Worker_.start(*this); } } @@ -233,29 +237,16 @@ namespace OpenWifi { void KafkaConsumer::Stop() { if (Running_) { Running_ = false; - Worker_.wakeUp(); + if(Dispatcher_) { + Dispatcher_->stop(); + } Worker_.join(); } } - void KafkaDispatcher::Start() { - if (!Running_) { - Running_ = true; - Worker_.start(*this); - } - } - - void KafkaDispatcher::Stop() { - if (Running_) { - Running_ = false; - Queue_.wakeUpAll(); - Worker_.join(); - } - } - - auto KafkaDispatcher::RegisterTopicWatcher(const std::string &Topic, + std::uint64_t KafkaConsumer::RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) { - std::lock_guard G(Mutex_); + std::lock_guard G(ConsumerMutex_); auto It = Notifiers_.find(Topic); if (It == Notifiers_.end()) { Types::TopicNotifyFunctionList L; @@ -264,11 +255,12 @@ namespace OpenWifi { } else { It->second.emplace(It->second.end(), std::make_pair(F, FunctionId_)); } + Topics_.insert(Topic); return FunctionId_++; } - void KafkaDispatcher::UnregisterTopicWatcher(const std::string &Topic, int Id) { - std::lock_guard G(Mutex_); + void KafkaConsumer::UnregisterTopicWatcher(const std::string &Topic, int Id) { + std::lock_guard G(ConsumerMutex_); auto It = Notifiers_.find(Topic); if (It != Notifiers_.end()) { Types::TopicNotifyFunctionList &L = It->second; @@ -280,56 +272,17 @@ namespace OpenWifi { } } - void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key, - const std::string & Payload) { - std::lock_guard G(Mutex_); - auto It = Notifiers_.find(Topic); - if (It != Notifiers_.end()) { - Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload)); - } - } - - void KafkaDispatcher::run() { - Poco::Logger &Logger_ = - Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel()); - poco_information(Logger_, "Starting..."); - Poco::AutoPtr Note(Queue_.waitDequeueNotification()); - Utils::SetThreadName("kafka:dispatch"); - while (Note && Running_) { - auto Msg = dynamic_cast(Note.get()); - if (Msg != nullptr) { - auto It = Notifiers_.find(Msg->Topic()); - if (It != Notifiers_.end()) { - const auto &FL = It->second; - for (const auto &[CallbackFunc, _] : FL) { - CallbackFunc(Msg->Key(), Msg->Payload()); - } - } - } - Note = Queue_.waitDequeueNotification(); - } - poco_information(Logger_, "Stopped..."); - } - - void KafkaDispatcher::Topics(std::vector &T) { - T.clear(); - for (const auto &[TopicName, _] : Notifiers_) - T.push_back(TopicName); - } - int KafkaManager::Start() { if (!KafkaEnabled_) return 0; ConsumerThr_.Start(); ProducerThr_.Start(); - Dispatcher_.Start(); return 0; } void KafkaManager::Stop() { if (KafkaEnabled_) { poco_information(Logger(), "Stopping..."); - Dispatcher_.Stop(); ProducerThr_.Stop(); ConsumerThr_.Stop(); poco_information(Logger(), "Stopped..."); @@ -353,12 +306,6 @@ namespace OpenWifi { } } - - void KafkaManager::Dispatch(const char *Topic, const std::string &Key, - const std::string &Payload) { - Dispatcher_.Dispatch(Topic, Key, Payload); - } - [[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) { return fmt::format( R"lit({{ "system" : {{ "id" : {}, "host" : "{}" }}, @@ -366,23 +313,6 @@ namespace OpenWifi { MicroServicePrivateEndPoint(), PayLoad ) ; } - uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, - Types::TopicNotifyFunction &F) { - if (KafkaEnabled_) { - return Dispatcher_.RegisterTopicWatcher(Topic, F); - } else { - return 0; - } - } - - void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) { - if (KafkaEnabled_) { - Dispatcher_.UnregisterTopicWatcher(Topic, Id); - } - } - - void KafkaManager::Topics(std::vector &T) { Dispatcher_.Topics(T); } - void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList &partitions) { poco_information( Logger(), fmt::format("Partition assigned: {}...", partitions.front().get_partition())); diff --git a/src/framework/KafkaManager.h b/src/framework/KafkaManager.h index 31cf093..e6272d3 100644 --- a/src/framework/KafkaManager.h +++ b/src/framework/KafkaManager.h @@ -39,7 +39,7 @@ namespace OpenWifi { void Produce(const char *Topic, const std::string &Key, const std::string & Payload); private: - std::recursive_mutex Mutex_; + std::mutex Mutex_; Poco::Thread Worker_; mutable std::atomic_bool Running_ = false; Poco::NotificationQueue Queue_; @@ -47,33 +47,22 @@ namespace OpenWifi { class KafkaConsumer : public Poco::Runnable { public: - void run() override; void Start(); void Stop(); private: - std::recursive_mutex Mutex_; - Poco::Thread Worker_; + std::mutex ConsumerMutex_; + Types::NotifyTable Notifiers_; + Poco::Thread Worker_; mutable std::atomic_bool Running_ = false; - }; + uint64_t FunctionId_ = 1; + std::unique_ptr Dispatcher_; + std::set Topics_; - class KafkaDispatcher : public Poco::Runnable { - public: - void Start(); - void Stop(); - auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); + void run() override; + friend class KafkaManager; + std::uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); void UnregisterTopicWatcher(const std::string &Topic, int Id); - void Dispatch(const char *Topic, const std::string &Key, const std::string & Payload); - void run() override; - void Topics(std::vector &T); - - private: - std::recursive_mutex Mutex_; - Types::NotifyTable Notifiers_; - Poco::Thread Worker_; - mutable std::atomic_bool Running_ = false; - uint64_t FunctionId_ = 1; - Poco::NotificationQueue Queue_; }; class KafkaManager : public SubSystemServer { @@ -96,19 +85,20 @@ namespace OpenWifi { void PostMessage(const char *topic, const std::string &key, const Poco::JSON::Object &Object, bool WrapMessage = true); - void Dispatch(const char *Topic, const std::string &Key, const std::string &Payload); [[nodiscard]] std::string WrapSystemId(const std::string & PayLoad); [[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; } - uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); - void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id); - void Topics(std::vector &T); + inline std::uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) { + return ConsumerThr_.RegisterTopicWatcher(Topic,F); + } + inline void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) { + return ConsumerThr_.UnregisterTopicWatcher(Topic,Id); + } private: bool KafkaEnabled_ = false; std::string SystemInfoWrapper_; KafkaProducer ProducerThr_; KafkaConsumer ConsumerThr_; - KafkaDispatcher Dispatcher_; void PartitionAssignment(const cppkafka::TopicPartitionList &partitions); void PartitionRevocation(const cppkafka::TopicPartitionList &partitions); From ca3c43d12562b2211c4130e27c01350c5d74012e Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 24 Sep 2023 11:17:37 -0700 Subject: [PATCH 09/11] https://telecominfraproject.atlassian.net/browse/WIFI-12954 Signed-off-by: stephb9959 --- src/APStats.cpp | 10 +++++----- src/HealthReceiver.cpp | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/APStats.cpp b/src/APStats.cpp index 8abb9ad..478e170 100644 --- a/src/APStats.cpp +++ b/src/APStats.cpp @@ -112,7 +112,7 @@ namespace OpenWifi { DI_.connected = true; AnalyticsObjects::DeviceTimePoint DTP; - poco_information(Logger(), fmt::format("{}: stats message.", DI_.serialNumber)); + poco_trace(Logger(), fmt::format("{}: stats message.", DI_.serialNumber)); // find radios first to get associations. try { @@ -573,7 +573,7 @@ namespace OpenWifi { try { if (Connection->contains("ping")) { got_connection = true; - poco_debug(Logger(), fmt::format("{}: ping message.", DI_.serialNumber)); + poco_trace(Logger(), fmt::format("{}: ping message.", DI_.serialNumber)); DI_.connected = true; DI_.lastPing = Utils::Now(); auto ping = (*Connection)["ping"]; @@ -589,13 +589,13 @@ namespace OpenWifi { } } } else if (Connection->contains("disconnection")) { - poco_debug(Logger(), fmt::format("{}: disconnection message.", DI_.serialNumber)); + poco_trace(Logger(), fmt::format("{}: disconnection message.", DI_.serialNumber)); auto Disconnection = (*Connection)["disconnection"]; GetJSON("timestamp", Disconnection, DI_.lastDisconnection, (uint64_t)0); got_base = got_health = got_connection = false; DI_.connected = false; } else if (Connection->contains("capabilities")) { - poco_debug(Logger(), fmt::format("{}: connection message.", DI_.serialNumber)); + poco_trace(Logger(), fmt::format("{}: connection message.", DI_.serialNumber)); got_connection = true; DI_.connected = true; DI_.lastConnection = Utils::Now(); @@ -621,7 +621,7 @@ namespace OpenWifi { got_health = true; GetJSON("timestamp", *Health, DI_.lastHealth, (uint64_t)0); GetJSON("sanity", *Health, DI_.health, (uint64_t)0); - poco_debug(Logger(), fmt::format("{}: health message.", DI_.serialNumber)); + poco_trace(Logger(), fmt::format("{}: health message.", DI_.serialNumber)); } catch (...) { poco_information(Logger(), fmt::format("{}: error parsing health message.", DI_.serialNumber)); diff --git a/src/HealthReceiver.cpp b/src/HealthReceiver.cpp index b1d23a0..e64631f 100644 --- a/src/HealthReceiver.cpp +++ b/src/HealthReceiver.cpp @@ -78,7 +78,7 @@ namespace OpenWifi { void HealthReceiver::HealthReceived(const std::string &Key, const std::string &Payload) { std::lock_guard G(Mutex_); - poco_debug(Logger(), fmt::format("Device({}): Health message.", Key)); + poco_trace(Logger(), fmt::format("Device({}): Health message.", Key)); Queue_.enqueueNotification(new HealthMessage(Key, Payload)); } } // namespace OpenWifi \ No newline at end of file From 5ee9c9ed65888f9c63baa88a3590c1ee39df8172 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 24 Sep 2023 11:20:02 -0700 Subject: [PATCH 10/11] https://telecominfraproject.atlassian.net/browse/WIFI-12954 Signed-off-by: stephb9959 --- src/DeviceStatusReceiver.cpp | 2 +- src/StateReceiver.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DeviceStatusReceiver.cpp b/src/DeviceStatusReceiver.cpp index 5b3eaad..19d9dfe 100644 --- a/src/DeviceStatusReceiver.cpp +++ b/src/DeviceStatusReceiver.cpp @@ -80,7 +80,7 @@ namespace OpenWifi { void DeviceStatusReceiver::DeviceStatusReceived(const std::string &Key, const std::string &Payload) { std::lock_guard G(Mutex_); - poco_debug(Logger(), fmt::format("Device({}): Connection/Ping message.", Key)); + poco_trace(Logger(), fmt::format("Device({}): Connection/Ping message.", Key)); Queue_.enqueueNotification(new DeviceStatusMessage(Key, Payload)); } } // namespace OpenWifi \ No newline at end of file diff --git a/src/StateReceiver.cpp b/src/StateReceiver.cpp index 8cd45e1..4ad7dd3 100644 --- a/src/StateReceiver.cpp +++ b/src/StateReceiver.cpp @@ -61,7 +61,7 @@ namespace OpenWifi { void StateReceiver::StateReceived(const std::string &Key, const std::string &Payload) { std::lock_guard G(Mutex_); - poco_debug(Logger(), fmt::format("Device({}): State message.", Key)); + poco_trace(Logger(), fmt::format("Device({}): State message.", Key)); Queue_.enqueueNotification(new StateMessage(Key, Payload)); } From 6d03e7e9f403d10d9885c566b56a09b7feacba31 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 24 Sep 2023 11:51:40 -0700 Subject: [PATCH 11/11] https://telecominfraproject.atlassian.net/browse/WIFI-12954 Signed-off-by: stephb9959 --- src/VenueCoordinator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/VenueCoordinator.cpp b/src/VenueCoordinator.cpp index 8d9d3dc..4a88b1d 100644 --- a/src/VenueCoordinator.cpp +++ b/src/VenueCoordinator.cpp @@ -60,7 +60,7 @@ namespace OpenWifi { Utils::SetThreadName("venue-coord"); Running_ = true; while (Running_) { - Poco::Thread::trySleep(20000); + Poco::Thread::trySleep(60000); if (!Running_) break;