Initial commit

This commit is contained in:
stephb9959
2022-03-14 09:42:14 -07:00
parent 32077133e9
commit d9599b4707
9 changed files with 191 additions and 14 deletions

View File

@@ -83,7 +83,7 @@ add_executable(owanalytics
src/RESTAPI/RESTAPI_routers.cpp
src/Daemon.cpp src/Daemon.h
src/Dashboard.h src/Dashboard.cpp
src/StorageService.cpp src/StorageService.h src/RESTObjects/RESTAPI_AnalyticsObjects.cpp src/RESTObjects/RESTAPI_AnalyticsObjects.h src/StateReceiver.cpp src/StateReceiver.h src/VenueWatcher.cpp src/VenueWatcher.h src/VenueCoordinator.cpp src/VenueCoordinator.h src/sdks/SDK_prov.cpp src/sdks/SDK_prov.h src/storage/storage_boards.cpp src/storage/storage_boards.h src/RESTAPI/RESTAPI_board_list_handler.cpp src/RESTAPI/RESTAPI_board_list_handler.h src/RESTAPI/RESTAPI_board_handler.cpp src/RESTAPI/RESTAPI_board_handler.h src/RESTAPI/RESTAPI_analytics_db_helpers.h src/APStats.cpp src/APStats.h src/dict_ssid.h src/dict_ue.h src/dict_bssid.h)
src/StorageService.cpp src/StorageService.h src/RESTObjects/RESTAPI_AnalyticsObjects.cpp src/RESTObjects/RESTAPI_AnalyticsObjects.h src/StateReceiver.cpp src/StateReceiver.h src/VenueWatcher.cpp src/VenueWatcher.h src/VenueCoordinator.cpp src/VenueCoordinator.h src/sdks/SDK_prov.cpp src/sdks/SDK_prov.h src/storage/storage_boards.cpp src/storage/storage_boards.h src/RESTAPI/RESTAPI_board_list_handler.cpp src/RESTAPI/RESTAPI_board_list_handler.h src/RESTAPI/RESTAPI_board_handler.cpp src/RESTAPI/RESTAPI_board_handler.h src/RESTAPI/RESTAPI_analytics_db_helpers.h src/APStats.cpp src/APStats.h src/dict_ssid.h src/dict_ue.h src/dict_bssid.h src/DeviceStatusReceiver.cpp src/DeviceStatusReceiver.h)
target_link_libraries(owanalytics PUBLIC
${Poco_LIBRARIES} ${MySQL_LIBRARIES}

View File

@@ -6,9 +6,14 @@
namespace OpenWifi {
void AP::Update(std::shared_ptr<nlohmann::json> &State) {
std::cout << "MAC: " << Utils::IntToSerialNumber(mac_) << std::endl;
void AP::UpdateStats(std::shared_ptr<nlohmann::json> &State) {
std::cout << "Stats update for MAC: " << Utils::IntToSerialNumber(mac_) << std::endl;
std::cout << *State << std::endl;
}
void AP::UpdateConnection(std::shared_ptr<nlohmann::json> &Connection) {
std::cout << "Connection update for MAC: " << Utils::IntToSerialNumber(mac_) << std::endl;
std::cout << *Connection << std::endl;
}
}

View File

@@ -55,7 +55,9 @@ namespace OpenWifi {
}
void Update(std::shared_ptr<nlohmann::json> & State);
void UpdateStats(std::shared_ptr<nlohmann::json> & State);
void UpdateConnection(std::shared_ptr<nlohmann::json> & Connection);
private:
uint64_t mac_=0;

View File

@@ -15,6 +15,7 @@
#include "StorageService.h"
#include "VenueCoordinator.h"
#include "StateReceiver.h"
#include "DeviceStatusReceiver.h"
namespace OpenWifi {
class Daemon *Daemon::instance_ = nullptr;
@@ -29,6 +30,7 @@ namespace OpenWifi {
SubSystemVec{
OpenWifi::StorageService(),
StateReceiver(),
DeviceStatusReceiver(),
VenueCoordinator()
});
}

View File

@@ -0,0 +1,85 @@
//
// Created by stephane bourque on 2022-03-13.
//
#include "DeviceStatusReceiver.h"
#include "VenueWatcher.h"
namespace OpenWifi {
int DeviceStatusReceiver::Start() {
Running_ = true;
Types::TopicNotifyFunction F = [this](const std::string &Key, const std::string &Payload) { this->DeviceStatusReceived(Key,Payload); };
DeviceStateWatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::CONNECTION, F);
Worker_.start(*this);
return 0;
}
void DeviceStatusReceiver::Stop() {
Running_ = false;
KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, DeviceStateWatcherId_);
Queue_.wakeUpAll();
Worker_.join();
}
static void AssignIfPresent(const Poco::JSON::Object::Ptr & Obj, const std::string &Key, std::string &Value) {
if(Obj->has(Key)) {
Value = Obj->get(Key).toString();
}
}
void DeviceStatusReceiver::run() {
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
while(Note && Running_) {
auto Msg = dynamic_cast<DeviceStatusMessage *>(Note.get());
if(Msg!= nullptr) {
try {
nlohmann::json msg = nlohmann::json::parse(Msg->Payload());
if (msg.contains(uCentralProtocol::PAYLOAD)) {
auto payload = msg[uCentralProtocol::PAYLOAD];
uint64_t SerialNumber = Utils::SerialNumberToInt(Msg->Key());
std::lock_guard G(Mutex_);
for(const auto &[venue,devices]:Notifiers_) {
if(devices.find(SerialNumber)!=cend(devices)) {
auto connection_data = std::make_shared<nlohmann::json>(payload);
venue->PostConnection(SerialNumber, connection_data);
break;
}
}
}
} catch (const Poco::Exception &E) {
Logger().log(E);
} catch (...) {
}
} else {
}
Note = Queue_.waitDequeueNotification();
}
}
void DeviceStatusReceiver::Register(const std::vector <uint64_t> &SerialNumber, VenueWatcher *VW) {
std::lock_guard G(Mutex_);
std::set<uint64_t> NewSerialNumbers;
std::copy(SerialNumber.begin(),SerialNumber.end(),std::inserter(NewSerialNumbers,NewSerialNumbers.begin()));
auto it = Notifiers_.find(VW);
if(it==end(Notifiers_))
Notifiers_[VW]=NewSerialNumbers;
else
it->second = NewSerialNumbers;
}
void DeviceStatusReceiver::DeRegister(VenueWatcher *VW) {
std::lock_guard G(Mutex_);
Notifiers_.erase(VW);
}
void DeviceStatusReceiver::DeviceStatusReceived(const std::string &Key, const std::string &Payload) {
std::lock_guard G(Mutex_);
Logger().information(Poco::format("Device(%s): Connection/Ping message.", Key));
Queue_.enqueueNotification( new DeviceStatusMessage(Key,Payload));
}
}

View File

@@ -0,0 +1,56 @@
//
// Created by stephane bourque on 2022-03-10.
//
#pragma once
#include "framework/MicroService.h"
// #include "VenueWatcher.h"
namespace OpenWifi {
class DeviceStatusMessage : public Poco::Notification {
public:
explicit DeviceStatusMessage(const std::string &Key, const std::string &Payload ) :
Key_(Key),
Payload_(Payload) {}
const std::string & Key() { return Key_; }
const std::string & Payload() { return Payload_; }
private:
std::string Key_;
std::string Payload_;
};
class VenueWatcher;
class DeviceStatusReceiver : public SubSystemServer, Poco::Runnable {
public:
static auto instance() {
static auto instance_ = new DeviceStatusReceiver;
return instance_;
}
int Start() override;
void Stop() override;
void run() override;
void DeviceStatusReceived( const std::string & Key, const std::string & Payload);
void Register(const std::vector<uint64_t> & SerialNumber, VenueWatcher *VW);
void DeRegister(VenueWatcher *VW);
private:
// map of mac(as int), list of (id,func)
std::map<VenueWatcher *, std::set<uint64_t>> Notifiers_;
uint64_t DeviceStateWatcherId_=0;
Poco::NotificationQueue Queue_;
Poco::Thread Worker_;
std::atomic_bool Running_=false;
DeviceStatusReceiver() noexcept:
SubSystemServer("DeviceStatus", "DEV-STATUS-RECEIVER", "devicestatus.receiver")
{
}
};
inline auto DeviceStatusReceiver() { return DeviceStatusReceiver::instance(); }
}

View File

@@ -34,10 +34,11 @@ namespace OpenWifi {
if (payload.contains("state") && payload.contains("serial")) {
auto serialNumber = payload["serial"].get<std::string>();
auto state = std::make_shared<nlohmann::json>(payload["state"]);
std::lock_guard G(Mutex_);
auto it = Notifiers_.find(Utils::SerialNumberToInt(serialNumber));
if(it!=Notifiers_.end()) {
for(const auto &i:it->second) {
i->Post(Utils::SerialNumberToInt(serialNumber), state);
i->PostState(Utils::SerialNumberToInt(serialNumber), state);
}
}
}

View File

@@ -4,16 +4,20 @@
#include "VenueWatcher.h"
#include "StateReceiver.h"
#include "DeviceStatusReceiver.h"
namespace OpenWifi {
void VenueWatcher::Start() {
for(const auto &i:SerialNumbers_)
StateReceiver()->Register(i,this);
DeviceStatusReceiver()->Register(SerialNumbers_,this);
Worker_.start(*this);
}
void VenueWatcher::Stop() {
DeviceStatusReceiver()->DeRegister(this);
Running_ = false;
Queue_.wakeUpAll();
Worker_.join();
@@ -27,11 +31,19 @@ namespace OpenWifi {
if(MsgContent!= nullptr) {
try {
auto State = MsgContent->Payload();
if(MsgContent->Type()==VenueMessage::connection) {
auto It = APs_.find(MsgContent->SerialNumber());
if(It!=end(APs_)) {
std::thread T([&] {It->second->Update(MsgContent->Payload());});
std::thread T([&] {It->second->UpdateConnection(MsgContent->Payload());});
T.detach();
}
} else if(MsgContent->Type()==VenueMessage::state) {
auto It = APs_.find(MsgContent->SerialNumber());
if(It!=end(APs_)) {
std::thread T([&] {It->second->UpdateStats(MsgContent->Payload());});
T.detach();
}
}
} catch (const Poco::Exception &E) {
Logger().log(E);
} catch (...) {

View File

@@ -11,15 +11,24 @@ namespace OpenWifi {
class VenueMessage : public Poco::Notification {
public:
explicit VenueMessage(uint64_t SerialNumber, std::shared_ptr<nlohmann::json> &M ) :
enum MsgType {
connection,
state
};
explicit VenueMessage(uint64_t SerialNumber, MsgType Msg, std::shared_ptr<nlohmann::json> &M ) :
Payload_(M),
Type_(Msg),
SerialNumber_(SerialNumber) {
}
inline std::shared_ptr<nlohmann::json> & Payload() { return Payload_; }
inline auto SerialNumber() { return SerialNumber_; }
inline const auto SerialNumber() { return SerialNumber_; }
inline uint64_t Type() { return Type_; }
private:
std::shared_ptr<nlohmann::json> Payload_;
uint64_t SerialNumber_=0;
MsgType Type_;
};
class VenueWatcher : public Poco::Runnable {
@@ -39,9 +48,14 @@ namespace OpenWifi {
}
}
void Post(uint64_t SerialNumber, std::shared_ptr<nlohmann::json> &Msg) {
inline void PostState(uint64_t SerialNumber, std::shared_ptr<nlohmann::json> &Msg) {
std::lock_guard G(Mutex_);
Queue_.enqueueNotification(new VenueMessage(SerialNumber, Msg));
Queue_.enqueueNotification(new VenueMessage(SerialNumber, VenueMessage::state, Msg));
}
inline void PostConnection(uint64_t SerialNumber, std::shared_ptr<nlohmann::json> &Msg) {
std::lock_guard G(Mutex_);
Queue_.enqueueNotification(new VenueMessage(SerialNumber, VenueMessage::connection, Msg));
}
void Start();