diff --git a/CMakeLists.txt b/CMakeLists.txt index 58ae59d..d77190e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -83,7 +83,7 @@ add_executable(owanalytics src/RESTAPI/RESTAPI_routers.cpp src/Daemon.cpp src/Daemon.h src/Dashboard.h src/Dashboard.cpp - src/StorageService.cpp src/StorageService.h src/RESTObjects/RESTAPI_AnalyticsObjects.cpp src/RESTObjects/RESTAPI_AnalyticsObjects.h src/StateReceiver.cpp src/StateReceiver.h src/VenueWatcher.cpp src/VenueWatcher.h src/VenueCoordinator.cpp src/VenueCoordinator.h src/sdks/SDK_prov.cpp src/sdks/SDK_prov.h src/storage/storage_boards.cpp src/storage/storage_boards.h src/RESTAPI/RESTAPI_board_list_handler.cpp src/RESTAPI/RESTAPI_board_list_handler.h src/RESTAPI/RESTAPI_board_handler.cpp src/RESTAPI/RESTAPI_board_handler.h src/RESTAPI/RESTAPI_analytics_db_helpers.h src/APStats.cpp src/APStats.h src/dict_ssid.h src/dict_ue.h src/dict_bssid.h) + src/StorageService.cpp src/StorageService.h src/RESTObjects/RESTAPI_AnalyticsObjects.cpp src/RESTObjects/RESTAPI_AnalyticsObjects.h src/StateReceiver.cpp src/StateReceiver.h src/VenueWatcher.cpp src/VenueWatcher.h src/VenueCoordinator.cpp src/VenueCoordinator.h src/sdks/SDK_prov.cpp src/sdks/SDK_prov.h src/storage/storage_boards.cpp src/storage/storage_boards.h src/RESTAPI/RESTAPI_board_list_handler.cpp src/RESTAPI/RESTAPI_board_list_handler.h src/RESTAPI/RESTAPI_board_handler.cpp src/RESTAPI/RESTAPI_board_handler.h src/RESTAPI/RESTAPI_analytics_db_helpers.h src/APStats.cpp src/APStats.h src/dict_ssid.h src/dict_ue.h src/dict_bssid.h src/DeviceStatusReceiver.cpp src/DeviceStatusReceiver.h) target_link_libraries(owanalytics PUBLIC ${Poco_LIBRARIES} ${MySQL_LIBRARIES} diff --git a/src/APStats.cpp b/src/APStats.cpp index f7530ee..b05a66c 100644 --- a/src/APStats.cpp +++ b/src/APStats.cpp @@ -6,9 +6,14 @@ namespace OpenWifi { - void AP::Update(std::shared_ptr &State) { - std::cout << "MAC: " << Utils::IntToSerialNumber(mac_) << std::endl; + void AP::UpdateStats(std::shared_ptr &State) { + std::cout << "Stats update for MAC: " << Utils::IntToSerialNumber(mac_) << std::endl; std::cout << *State << std::endl; } + void AP::UpdateConnection(std::shared_ptr &Connection) { + std::cout << "Connection update for MAC: " << Utils::IntToSerialNumber(mac_) << std::endl; + std::cout << *Connection << std::endl; + } + } \ No newline at end of file diff --git a/src/APStats.h b/src/APStats.h index 2823235..badaffd 100644 --- a/src/APStats.h +++ b/src/APStats.h @@ -55,7 +55,9 @@ namespace OpenWifi { } - void Update(std::shared_ptr & State); + void UpdateStats(std::shared_ptr & State); + void UpdateConnection(std::shared_ptr & Connection); + private: uint64_t mac_=0; diff --git a/src/Daemon.cpp b/src/Daemon.cpp index a925737..b6994aa 100644 --- a/src/Daemon.cpp +++ b/src/Daemon.cpp @@ -15,6 +15,7 @@ #include "StorageService.h" #include "VenueCoordinator.h" #include "StateReceiver.h" +#include "DeviceStatusReceiver.h" namespace OpenWifi { class Daemon *Daemon::instance_ = nullptr; @@ -29,6 +30,7 @@ namespace OpenWifi { SubSystemVec{ OpenWifi::StorageService(), StateReceiver(), + DeviceStatusReceiver(), VenueCoordinator() }); } diff --git a/src/DeviceStatusReceiver.cpp b/src/DeviceStatusReceiver.cpp new file mode 100644 index 0000000..0564a22 --- /dev/null +++ b/src/DeviceStatusReceiver.cpp @@ -0,0 +1,85 @@ +// +// Created by stephane bourque on 2022-03-13. +// + +#include "DeviceStatusReceiver.h" +#include "VenueWatcher.h" + +namespace OpenWifi { + int DeviceStatusReceiver::Start() { + Running_ = true; + Types::TopicNotifyFunction F = [this](const std::string &Key, const std::string &Payload) { this->DeviceStatusReceived(Key,Payload); }; + DeviceStateWatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::CONNECTION, F); + Worker_.start(*this); + return 0; + } + + void DeviceStatusReceiver::Stop() { + Running_ = false; + KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, DeviceStateWatcherId_); + Queue_.wakeUpAll(); + Worker_.join(); + } + + static void AssignIfPresent(const Poco::JSON::Object::Ptr & Obj, const std::string &Key, std::string &Value) { + if(Obj->has(Key)) { + Value = Obj->get(Key).toString(); + } + } + + void DeviceStatusReceiver::run() { + Poco::AutoPtr Note(Queue_.waitDequeueNotification()); + while(Note && Running_) { + auto Msg = dynamic_cast(Note.get()); + if(Msg!= nullptr) { + try { + nlohmann::json msg = nlohmann::json::parse(Msg->Payload()); + if (msg.contains(uCentralProtocol::PAYLOAD)) { + auto payload = msg[uCentralProtocol::PAYLOAD]; + + uint64_t SerialNumber = Utils::SerialNumberToInt(Msg->Key()); + std::lock_guard G(Mutex_); + + for(const auto &[venue,devices]:Notifiers_) { + if(devices.find(SerialNumber)!=cend(devices)) { + auto connection_data = std::make_shared(payload); + venue->PostConnection(SerialNumber, connection_data); + break; + } + } + } + } catch (const Poco::Exception &E) { + Logger().log(E); + } catch (...) { + + } + } else { + + } + Note = Queue_.waitDequeueNotification(); + } + } + + void DeviceStatusReceiver::Register(const std::vector &SerialNumber, VenueWatcher *VW) { + std::lock_guard G(Mutex_); + + std::set NewSerialNumbers; + std::copy(SerialNumber.begin(),SerialNumber.end(),std::inserter(NewSerialNumbers,NewSerialNumbers.begin())); + auto it = Notifiers_.find(VW); + if(it==end(Notifiers_)) + Notifiers_[VW]=NewSerialNumbers; + else + it->second = NewSerialNumbers; + } + + void DeviceStatusReceiver::DeRegister(VenueWatcher *VW) { + std::lock_guard G(Mutex_); + Notifiers_.erase(VW); + } + + void DeviceStatusReceiver::DeviceStatusReceived(const std::string &Key, const std::string &Payload) { + std::lock_guard G(Mutex_); + Logger().information(Poco::format("Device(%s): Connection/Ping message.", Key)); + Queue_.enqueueNotification( new DeviceStatusMessage(Key,Payload)); + } +} \ No newline at end of file diff --git a/src/DeviceStatusReceiver.h b/src/DeviceStatusReceiver.h new file mode 100644 index 0000000..7fd5283 --- /dev/null +++ b/src/DeviceStatusReceiver.h @@ -0,0 +1,56 @@ +// +// Created by stephane bourque on 2022-03-10. +// + +#pragma once + +#include "framework/MicroService.h" +// #include "VenueWatcher.h" + +namespace OpenWifi { + class DeviceStatusMessage : public Poco::Notification { + public: + explicit DeviceStatusMessage(const std::string &Key, const std::string &Payload ) : + Key_(Key), + Payload_(Payload) {} + const std::string & Key() { return Key_; } + const std::string & Payload() { return Payload_; } + private: + std::string Key_; + std::string Payload_; + }; + + class VenueWatcher; + + class DeviceStatusReceiver : public SubSystemServer, Poco::Runnable { + public: + + static auto instance() { + static auto instance_ = new DeviceStatusReceiver; + return instance_; + } + + int Start() override; + void Stop() override; + void run() override; + void DeviceStatusReceived( const std::string & Key, const std::string & Payload); + void Register(const std::vector & SerialNumber, VenueWatcher *VW); + void DeRegister(VenueWatcher *VW); + + private: + // map of mac(as int), list of (id,func) + std::map> Notifiers_; + uint64_t DeviceStateWatcherId_=0; + Poco::NotificationQueue Queue_; + Poco::Thread Worker_; + std::atomic_bool Running_=false; + + DeviceStatusReceiver() noexcept: + SubSystemServer("DeviceStatus", "DEV-STATUS-RECEIVER", "devicestatus.receiver") + { + } + }; + + inline auto DeviceStatusReceiver() { return DeviceStatusReceiver::instance(); } + +} \ No newline at end of file diff --git a/src/StateReceiver.cpp b/src/StateReceiver.cpp index f65c04a..d757793 100644 --- a/src/StateReceiver.cpp +++ b/src/StateReceiver.cpp @@ -34,10 +34,11 @@ namespace OpenWifi { if (payload.contains("state") && payload.contains("serial")) { auto serialNumber = payload["serial"].get(); auto state = std::make_shared(payload["state"]); + std::lock_guard G(Mutex_); auto it = Notifiers_.find(Utils::SerialNumberToInt(serialNumber)); if(it!=Notifiers_.end()) { for(const auto &i:it->second) { - i->Post(Utils::SerialNumberToInt(serialNumber), state); + i->PostState(Utils::SerialNumberToInt(serialNumber), state); } } } diff --git a/src/VenueWatcher.cpp b/src/VenueWatcher.cpp index 2e1a663..5feb476 100644 --- a/src/VenueWatcher.cpp +++ b/src/VenueWatcher.cpp @@ -4,16 +4,20 @@ #include "VenueWatcher.h" #include "StateReceiver.h" +#include "DeviceStatusReceiver.h" namespace OpenWifi { void VenueWatcher::Start() { for(const auto &i:SerialNumbers_) StateReceiver()->Register(i,this); + + DeviceStatusReceiver()->Register(SerialNumbers_,this); Worker_.start(*this); } void VenueWatcher::Stop() { + DeviceStatusReceiver()->DeRegister(this); Running_ = false; Queue_.wakeUpAll(); Worker_.join(); @@ -27,10 +31,18 @@ namespace OpenWifi { if(MsgContent!= nullptr) { try { auto State = MsgContent->Payload(); - auto It = APs_.find(MsgContent->SerialNumber()); - if(It!=end(APs_)) { - std::thread T([&] {It->second->Update(MsgContent->Payload());}); - T.detach(); + if(MsgContent->Type()==VenueMessage::connection) { + auto It = APs_.find(MsgContent->SerialNumber()); + if(It!=end(APs_)) { + std::thread T([&] {It->second->UpdateConnection(MsgContent->Payload());}); + T.detach(); + } + } else if(MsgContent->Type()==VenueMessage::state) { + auto It = APs_.find(MsgContent->SerialNumber()); + if(It!=end(APs_)) { + std::thread T([&] {It->second->UpdateStats(MsgContent->Payload());}); + T.detach(); + } } } catch (const Poco::Exception &E) { Logger().log(E); diff --git a/src/VenueWatcher.h b/src/VenueWatcher.h index e08ac0d..469117c 100644 --- a/src/VenueWatcher.h +++ b/src/VenueWatcher.h @@ -11,15 +11,24 @@ namespace OpenWifi { class VenueMessage : public Poco::Notification { public: - explicit VenueMessage(uint64_t SerialNumber, std::shared_ptr &M ) : + enum MsgType { + connection, + state + }; + + explicit VenueMessage(uint64_t SerialNumber, MsgType Msg, std::shared_ptr &M ) : Payload_(M), + Type_(Msg), SerialNumber_(SerialNumber) { } inline std::shared_ptr & Payload() { return Payload_; } - inline auto SerialNumber() { return SerialNumber_; } + inline const auto SerialNumber() { return SerialNumber_; } + inline uint64_t Type() { return Type_; } + private: std::shared_ptr Payload_; - uint64_t SerialNumber_=0; + uint64_t SerialNumber_=0; + MsgType Type_; }; class VenueWatcher : public Poco::Runnable { @@ -39,9 +48,14 @@ namespace OpenWifi { } } - void Post(uint64_t SerialNumber, std::shared_ptr &Msg) { + inline void PostState(uint64_t SerialNumber, std::shared_ptr &Msg) { std::lock_guard G(Mutex_); - Queue_.enqueueNotification(new VenueMessage(SerialNumber, Msg)); + Queue_.enqueueNotification(new VenueMessage(SerialNumber, VenueMessage::state, Msg)); + } + + inline void PostConnection(uint64_t SerialNumber, std::shared_ptr &Msg) { + std::lock_guard G(Mutex_); + Queue_.enqueueNotification(new VenueMessage(SerialNumber, VenueMessage::connection, Msg)); } void Start();