stephb9959
2022-10-30 10:15:38 -07:00
parent a18cb37671
commit fda8afd90c
9 changed files with 198 additions and 207 deletions

2
build
View File

@@ -1 +1 @@
1 3

View File

@@ -48,6 +48,7 @@ namespace OpenWifi {
int ALBHealthCheckServer::Start() { int ALBHealthCheckServer::Start() {
if(MicroServiceConfigGetBool("alb.enable",false)) { if(MicroServiceConfigGetBool("alb.enable",false)) {
poco_information(Logger(),"Starting...");
Running_=true; Running_=true;
Port_ = (int)MicroServiceConfigGetInt("alb.port",15015); Port_ = (int)MicroServiceConfigGetInt("alb.port",15015);
Socket_ = std::make_unique<Poco::Net::ServerSocket>(Port_); Socket_ = std::make_unique<Poco::Net::ServerSocket>(Port_);

View File

@@ -23,7 +23,7 @@ namespace OpenWifi {
void handleRequest([[maybe_unused]] Poco::Net::HTTPServerRequest& Request, Poco::Net::HTTPServerResponse& Response) override; void handleRequest([[maybe_unused]] Poco::Net::HTTPServerRequest& Request, Poco::Net::HTTPServerResponse& Response) override;
private: private:
[[maybe_unused]] Poco::Logger & Logger_; Poco::Logger & Logger_;
uint64_t id_; uint64_t id_;
}; };
@@ -34,7 +34,7 @@ namespace OpenWifi {
ALBRequestHandler* createRequestHandler(const Poco::Net::HTTPServerRequest& request) override; ALBRequestHandler* createRequestHandler(const Poco::Net::HTTPServerRequest& request) override;
private: private:
Poco::Logger &Logger_; Poco::Logger &Logger_;
inline static std::atomic_uint64_t req_id_=1; inline static std::atomic_uint64_t req_id_=1;
}; };

View File

@@ -32,7 +32,7 @@ namespace OpenWifi {
nlohmann::json RootSchema_; nlohmann::json RootSchema_;
ConfigurationValidator(): ConfigurationValidator():
SubSystemServer("configvalidator", "CFG-VALIDATOR", "config.validator") { SubSystemServer("ConfigValidator", "CFG-VALIDATOR", "config.validator") {
} }
}; };

View File

@@ -30,6 +30,7 @@ namespace OpenWifi {
}; };
void EventBusManager::Start() { void EventBusManager::Start() {
poco_information(Logger(),"Starting...");
if(KafkaManager()->Enabled()) { if(KafkaManager()->Enabled()) {
Thread_.start(*this); Thread_.start(*this);
} }

View File

@@ -67,9 +67,9 @@ namespace OpenWifi {
KafkaEnabled_ = MicroServiceConfigGetBool("openwifi.kafka.enable",false); KafkaEnabled_ = MicroServiceConfigGetBool("openwifi.kafka.enable",false);
} }
inline void KafkaProducer::run() { inline void KafkaProducer::run() {
Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-PRODUCER", KafkaManager()->Logger().getChannel());
poco_information(Logger_,"Starting...");
Utils::SetThreadName("Kafka:Prod"); Utils::SetThreadName("Kafka:Prod");
cppkafka::Configuration Config({ cppkafka::Configuration Config({
@@ -99,19 +99,24 @@ namespace OpenWifi {
cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload())); cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload()));
} }
} catch (const cppkafka::HandleException &E) { } catch (const cppkafka::HandleException &E) {
poco_warning(KafkaManager()->Logger(),fmt::format("Caught a Kafka exception (producer): {}", E.what())); poco_warning(Logger_,fmt::format("Caught a Kafka exception (producer): {}", E.what()));
} catch( const Poco::Exception &E) { } catch( const Poco::Exception &E) {
KafkaManager()->Logger().log(E); Logger_.log(E);
} catch (...) { } catch (...) {
poco_error(KafkaManager()->Logger(),"std::exception"); poco_error(Logger_,"std::exception");
} }
Note = Queue_.waitDequeueNotification(); Note = Queue_.waitDequeueNotification();
} }
poco_information(Logger_,"Stopped...");
} }
inline void KafkaConsumer::run() { inline void KafkaConsumer::run() {
Utils::SetThreadName("Kafka:Cons"); Utils::SetThreadName("Kafka:Cons");
Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-CONSUMER", KafkaManager()->Logger().getChannel());
poco_information(Logger_,"Starting...");
cppkafka::Configuration Config({ cppkafka::Configuration Config({
{ "client.id", MicroServiceConfigGetString("openwifi.kafka.client.id","") }, { "client.id", MicroServiceConfigGetString("openwifi.kafka.client.id","") },
{ "metadata.broker.list", MicroServiceConfigGetString("openwifi.kafka.brokerlist","") }, { "metadata.broker.list", MicroServiceConfigGetString("openwifi.kafka.brokerlist","") },
@@ -134,15 +139,15 @@ namespace OpenWifi {
Config.set_default_topic_configuration(topic_config); Config.set_default_topic_configuration(topic_config);
cppkafka::Consumer Consumer(Config); cppkafka::Consumer Consumer(Config);
Consumer.set_assignment_callback([](cppkafka::TopicPartitionList& partitions) { Consumer.set_assignment_callback([&](cppkafka::TopicPartitionList& partitions) {
if(!partitions.empty()) { if(!partitions.empty()) {
KafkaManager()->Logger().information(fmt::format("Partition assigned: {}...", poco_information(Logger_,fmt::format("Partition assigned: {}...",
partitions.front().get_partition())); partitions.front().get_partition()));
} }
}); });
Consumer.set_revocation_callback([](const cppkafka::TopicPartitionList& partitions) { Consumer.set_revocation_callback([&](const cppkafka::TopicPartitionList& partitions) {
if(!partitions.empty()) { if(!partitions.empty()) {
KafkaManager()->Logger().information(fmt::format("Partition revocation: {}...", poco_information(Logger_,fmt::format("Partition revocation: {}...",
partitions.front().get_partition())); partitions.front().get_partition()));
} }
}); });
@@ -163,7 +168,7 @@ namespace OpenWifi {
continue; continue;
if (Msg.get_error()) { if (Msg.get_error()) {
if (!Msg.is_eof()) { if (!Msg.is_eof()) {
poco_error(KafkaManager()->Logger(),fmt::format("Error: {}", Msg.get_error().to_string())); poco_error(Logger_,fmt::format("Error: {}", Msg.get_error().to_string()));
} }
if(!AutoCommit) if(!AutoCommit)
Consumer.async_commit(Msg); Consumer.async_commit(Msg);
@@ -174,14 +179,15 @@ namespace OpenWifi {
Consumer.async_commit(Msg); Consumer.async_commit(Msg);
} }
} catch (const cppkafka::HandleException &E) { } catch (const cppkafka::HandleException &E) {
poco_warning(KafkaManager()->Logger(),fmt::format("Caught a Kafka exception (consumer): {}", E.what())); poco_warning(Logger_,fmt::format("Caught a Kafka exception (consumer): {}", E.what()));
} catch (const Poco::Exception &E) { } catch (const Poco::Exception &E) {
KafkaManager()->Logger().log(E); Logger_.log(E);
} catch (...) { } catch (...) {
poco_error(KafkaManager()->Logger(),"std::exception"); poco_error(Logger_,"std::exception");
} }
} }
Consumer.unsubscribe(); Consumer.unsubscribe();
poco_information(Logger_,"Stopped...");
} }
void KafkaProducer::Start() { void KafkaProducer::Start() {
@@ -269,6 +275,8 @@ namespace OpenWifi {
} }
void KafkaDispatcher::run() { void KafkaDispatcher::run() {
Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel());
poco_information(Logger_,"Starting...");
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification()); Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
Utils::SetThreadName("kafka:dispatch"); Utils::SetThreadName("kafka:dispatch");
while(Note && Running_) { while(Note && Running_) {
@@ -284,6 +292,7 @@ namespace OpenWifi {
} }
Note = Queue_.waitDequeueNotification(); Note = Queue_.waitDequeueNotification();
} }
poco_information(Logger_,"Stopped...");
} }
void KafkaDispatcher::Topics(std::vector<std::string> &T) { void KafkaDispatcher::Topics(std::vector<std::string> &T) {
@@ -296,7 +305,7 @@ namespace OpenWifi {
int KafkaManager::Start() { int KafkaManager::Start() {
if(!KafkaEnabled_) if(!KafkaEnabled_)
return 0; return 0;
ConsumerThr_.Start(); ConsumerThr_.Start();
ProducerThr_.Start(); ProducerThr_.Start();
Dispatcher_.Start(); Dispatcher_.Start();
return 0; return 0;
@@ -346,11 +355,11 @@ namespace OpenWifi {
} }
void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList& partitions) { void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList& partitions) {
Logger().information(fmt::format("Partition assigned: {}...", partitions.front().get_partition())); poco_information(Logger(),fmt::format("Partition assigned: {}...", partitions.front().get_partition()));
} }
void KafkaManager::PartitionRevocation(const cppkafka::TopicPartitionList& partitions) { void KafkaManager::PartitionRevocation(const cppkafka::TopicPartitionList& partitions) {
Logger().information(fmt::format("Partition revocation: {}...",partitions.front().get_partition())); poco_information(Logger(),fmt::format("Partition revocation: {}...",partitions.front().get_partition()));
} }
} // namespace OpenWifi } // namespace OpenWifi

View File

@@ -33,19 +33,18 @@ namespace OpenWifi {
}; };
class KafkaProducer : public Poco::Runnable { class KafkaProducer : public Poco::Runnable {
public: public:
void run () override;
void run () override; void Start();
void Start(); void Stop();
void Stop(); void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload);
void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload);
private: private:
std::recursive_mutex Mutex_; std::recursive_mutex Mutex_;
Poco::Thread Worker_; Poco::Thread Worker_;
mutable std::atomic_bool Running_=false; mutable std::atomic_bool Running_=false;
Poco::NotificationQueue Queue_; Poco::NotificationQueue Queue_;
}; };
class KafkaConsumer : public Poco::Runnable { class KafkaConsumer : public Poco::Runnable {
public: public:
@@ -54,14 +53,13 @@ namespace OpenWifi {
void Stop(); void Stop();
private: private:
std::recursive_mutex Mutex_; std::recursive_mutex Mutex_;
Poco::Thread Worker_; Poco::Thread Worker_;
mutable std::atomic_bool Running_=false; mutable std::atomic_bool Running_=false;
}; };
class KafkaDispatcher : public Poco::Runnable { class KafkaDispatcher : public Poco::Runnable {
public: public:
void Start(); void Start();
void Stop(); void Stop();
auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
@@ -104,11 +102,11 @@ namespace OpenWifi {
void Topics(std::vector<std::string> &T); void Topics(std::vector<std::string> &T);
private: private:
bool KafkaEnabled_ = false; bool KafkaEnabled_ = false;
std::string SystemInfoWrapper_; std::string SystemInfoWrapper_;
KafkaProducer ProducerThr_; KafkaProducer ProducerThr_;
KafkaConsumer ConsumerThr_; KafkaConsumer ConsumerThr_;
KafkaDispatcher Dispatcher_; KafkaDispatcher Dispatcher_;
void PartitionAssignment(const cppkafka::TopicPartitionList& partitions); void PartitionAssignment(const cppkafka::TopicPartitionList& partitions);
void PartitionRevocation(const cppkafka::TopicPartitionList& partitions); void PartitionRevocation(const cppkafka::TopicPartitionList& partitions);

View File

@@ -15,38 +15,59 @@
#include "fmt/format.h" #include "fmt/format.h"
#define DBG { std::cout << __LINE__ << std::endl; }
namespace OpenWifi { namespace OpenWifi {
void UI_WebSocketClientServer::NewClient(Poco::Net::WebSocket & WS, const std::string &Id, const std::string &UserName ) { void UI_WebSocketClientServer::NewClient(Poco::Net::WebSocket & WS, const std::string &Id, const std::string &UserName ) {
std::lock_guard G(Mutex_);
auto Client = std::make_unique<UI_WebSocketClient>(WS,Id,UserName,Logger(), Processor_); std::lock_guard G(Mutex_);
Clients_[Id] = std::make_pair(std::move(Client),""); auto Client = std::make_unique<UI_WebSocketClientInfo>(WS,Id, UserName);
} auto ClientSocket = Client->WS_->impl()->sockfd();
Client->WS_->setNoDelay(true);
Client->WS_->setKeepAlive(true);
Client->WS_->setBlocking(false);
Reactor_.addEventHandler(*Client->WS_,
Poco::NObserver<UI_WebSocketClientServer, Poco::Net::ReadableNotification>(
*this, &UI_WebSocketClientServer::OnSocketReadable));
Reactor_.addEventHandler(*Client->WS_,
Poco::NObserver<UI_WebSocketClientServer, Poco::Net::ShutdownNotification>(
*this, &UI_WebSocketClientServer::OnSocketShutdown));
Reactor_.addEventHandler(*Client->WS_,
Poco::NObserver<UI_WebSocketClientServer, Poco::Net::ErrorNotification>(
*this, &UI_WebSocketClientServer::OnSocketError));
Client->SocketRegistered_ = true;
Clients_[ClientSocket] = std::move(Client);
}
void UI_WebSocketClientServer::SetProcessor( UI_WebSocketClientProcessor * F) { void UI_WebSocketClientServer::SetProcessor( UI_WebSocketClientProcessor * F) {
Processor_ = F; Processor_ = F;
} }
void UI_WebSocketClientServer::UnRegister(const std::string &Id) {
std::lock_guard G(Mutex_);
Clients_.erase(Id);
}
void UI_WebSocketClientServer::SetUser(const std::string &Id, const std::string &UserId) {
std::lock_guard G(Mutex_);
auto it=Clients_.find(Id);
if(it!=Clients_.end()) {
Clients_[Id] = std::make_pair(std::move(it->second.first),UserId);
}
}
[[nodiscard]] inline bool SendToUser(const std::string &userName, const std::string &Payload);
UI_WebSocketClientServer::UI_WebSocketClientServer() noexcept: UI_WebSocketClientServer::UI_WebSocketClientServer() noexcept:
SubSystemServer("WebSocketClientServer", "UI-WSCLNT-SVR", "websocketclients") SubSystemServer("WebSocketClientServer", "UI-WSCLNT-SVR", "websocketclients")
{ {
} }
void UI_WebSocketClientServer::EndConnection([[maybe_unused]] std::lock_guard<std::recursive_mutex> &G, ClientList::iterator &Client) {
if(Client->second->SocketRegistered_) {
Client->second->SocketRegistered_ = false;
(*Client->second->WS_).shutdown();
Reactor_.removeEventHandler(*Client->second->WS_,
Poco::NObserver<UI_WebSocketClientServer,
Poco::Net::ReadableNotification>(*this,&UI_WebSocketClientServer::OnSocketReadable));
Reactor_.removeEventHandler(*Client->second->WS_,
Poco::NObserver<UI_WebSocketClientServer,
Poco::Net::ShutdownNotification>(*this,&UI_WebSocketClientServer::OnSocketShutdown));
Reactor_.removeEventHandler(*Client->second->WS_,
Poco::NObserver<UI_WebSocketClientServer,
Poco::Net::ErrorNotification>(*this,&UI_WebSocketClientServer::OnSocketError));
}
Clients_.erase(Client);
std::cout << "How many clients: " << Clients_.size() << std::endl;
}
void UI_WebSocketClientServer::run() { void UI_WebSocketClientServer::run() {
Running_ = true ; Running_ = true ;
Utils::SetThreadName("ws:uiclnt-svr"); Utils::SetThreadName("ws:uiclnt-svr");
@@ -59,9 +80,9 @@ namespace OpenWifi {
}; };
int UI_WebSocketClientServer::Start() { int UI_WebSocketClientServer::Start() {
poco_information(Logger(),"Starting...");
GoogleApiKey_ = MicroServiceConfigGetString("google.apikey",""); GoogleApiKey_ = MicroServiceConfigGetString("google.apikey","");
GeoCodeEnabled_ = !GoogleApiKey_.empty(); GeoCodeEnabled_ = !GoogleApiKey_.empty();
// ReactorPool_ = std::make_unique<MyParallelSocketReactor>();
ReactorThread_.start(Reactor_); ReactorThread_.start(Reactor_);
Thr_.start(*this); Thr_.start(*this);
return 0; return 0;
@@ -69,20 +90,24 @@ namespace OpenWifi {
void UI_WebSocketClientServer::Stop() { void UI_WebSocketClientServer::Stop() {
if(Running_) { if(Running_) {
poco_information(Logger(),"Stopping...");
Clients_.clear();
Reactor_.stop(); Reactor_.stop();
ReactorThread_.join(); ReactorThread_.join();
Running_ = false; Running_ = false;
Thr_.wakeUp(); Thr_.wakeUp();
Thr_.join(); Thr_.join();
poco_information(Logger(),"Stopped...");
} }
}; };
bool UI_WebSocketClientServer::Send(const std::string &Id, const std::string &Payload) { bool UI_WebSocketClientServer::SendToId(const std::string &Id, const std::string &Payload) {
std::lock_guard G(Mutex_); std::lock_guard G(Mutex_);
auto It = Clients_.find(Id); for(const auto &Client:Clients_) {
if(It!=Clients_.end()) if(Client.second->Id_==Id)
return It->second.first->Send(Payload); return Client.second->WS_->sendFrame(Payload.c_str(),(int)Payload.size());
}
return false; return false;
} }
@@ -91,9 +116,9 @@ namespace OpenWifi {
uint64_t Sent=0; uint64_t Sent=0;
for(const auto &client:Clients_) { for(const auto &client:Clients_) {
if(client.second.second == UserName) { if(client.second->UserName_ == UserName) {
try { try {
if (client.second.first->Send(Payload)) if (client.second->WS_->sendFrame(Payload.c_str(),(int)Payload.size()))
Sent++; Sent++;
} catch (...) { } catch (...) {
return false; return false;
@@ -108,159 +133,119 @@ namespace OpenWifi {
for(const auto &client:Clients_) { for(const auto &client:Clients_) {
try { try {
client.second.first->Send(Payload); client.second->WS_->sendFrame(Payload.c_str(),(int)Payload.size());
} catch (...) { } catch (...) {
} }
} }
} }
void UI_WebSocketClient::OnSocketError([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf) { UI_WebSocketClientServer::ClientList::iterator UI_WebSocketClientServer::FindWSClient( [[maybe_unused]] std::lock_guard<std::recursive_mutex> &G, int ClientSocket) {
EndConnection(); return Clients_.find(ClientSocket);
}
void UI_WebSocketClientServer::OnSocketError([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf) {
std::lock_guard G(LocalMutex_);
auto Client = FindWSClient(G,pNf->socket().impl()->sockfd());
if(Client==end(Clients_))
return;
EndConnection(G,Client);
} }
void UI_WebSocketClient::OnSocketReadable([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf) { void UI_WebSocketClientServer::OnSocketReadable([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf) {
int flags;
int n; UI_WebSocketClientServer::ClientList::iterator Client;
bool Done=false;
std::lock_guard G(LocalMutex_);
try { try {
Client = FindWSClient(G,pNf->socket().impl()->sockfd());
if( Client == end(Clients_))
return;
Poco::Buffer<char> IncomingFrame(0); Poco::Buffer<char> IncomingFrame(0);
n = WS_->receiveFrame(IncomingFrame, flags); int flags;
int n;
n = Client->second->WS_->receiveFrame(IncomingFrame, flags);
auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK; auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
if (n == 0) { if (n == 0) {
poco_debug(Logger(),fmt::format("CLOSE({}): {} UI Client is closing WS connection.", Id_, UserName_)); poco_debug(Logger(),fmt::format("CLOSE({}): {} UI Client is closing WS connection.", Client->second->Id_, Client->second->UserName_));
return EndConnection(); return EndConnection(G, Client);
} }
switch (Op) { switch (Op) {
case Poco::Net::WebSocket::FRAME_OP_PING: { case Poco::Net::WebSocket::FRAME_OP_PING: {
WS_->sendFrame("", 0, Client->second->WS_->sendFrame("", 0,
(int)Poco::Net::WebSocket::FRAME_OP_PONG | (int)Poco::Net::WebSocket::FRAME_OP_PONG |
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN); (int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
} break; } break;
case Poco::Net::WebSocket::FRAME_OP_PONG: { case Poco::Net::WebSocket::FRAME_OP_PONG: {
} break; } break;
case Poco::Net::WebSocket::FRAME_OP_CLOSE: { case Poco::Net::WebSocket::FRAME_OP_CLOSE: {
poco_debug(Logger(),fmt::format("CLOSE({}): {} UI Client is closing WS connection.", Id_, UserName_)); poco_debug(Logger(),fmt::format("CLOSE({}): {} UI Client is closing WS connection.", Client->second->Id_, Client->second->UserName_));
Done = true; return EndConnection(G, Client);
} break; } break;
case Poco::Net::WebSocket::FRAME_OP_TEXT: { case Poco::Net::WebSocket::FRAME_OP_TEXT: {
IncomingFrame.append(0); IncomingFrame.append(0);
if (!Authenticated_) { if (!Client->second->Authenticated_) {
std::string Frame{IncomingFrame.begin()}; std::string Frame{IncomingFrame.begin()};
auto Tokens = Utils::Split(Frame, ':'); auto Tokens = Utils::Split(Frame, ':');
bool Expired = false, Contacted = false; bool Expired = false, Contacted = false;
if (Tokens.size() == 2 && if (Tokens.size() == 2 &&
AuthClient()->IsAuthorized(Tokens[1], UserInfo_, 0, Expired, Contacted)) { AuthClient()->IsAuthorized(Tokens[1], Client->second->UserInfo_, 0, Expired, Contacted)) {
Authenticated_ = true; Client->second->Authenticated_ = true;
UserName_ = UserInfo_.userinfo.email; Client->second->UserName_ = Client->second->UserInfo_.userinfo.email;
poco_debug(Logger(),fmt::format("START({}): {} UI Client is starting WS connection.", Id_, UserName_)); poco_debug(Logger(),fmt::format("START({}): {} UI Client is starting WS connection.", Client->second->Id_, Client->second->UserName_));
std::string S{"Welcome! Bienvenue! Bienvenidos!"}; std::string S{"Welcome! Bienvenue! Bienvenidos!"};
WS_->sendFrame(S.c_str(), S.size()); Client->second->WS_->sendFrame(S.c_str(), S.size());
UI_WebSocketClientServer()->SetUser(Id_, UserInfo_.userinfo.email); Client->second->UserName_ = Client->second->UserInfo_.userinfo.email;
} else { } else {
std::string S{"Invalid token. Closing connection."}; std::string S{"Invalid token. Closing connection."};
WS_->sendFrame(S.c_str(), S.size()); Client->second->WS_->sendFrame(S.c_str(), S.size());
Done = true; return EndConnection(G, Client);
} }
} else { } else {
try { Poco::JSON::Parser P;
Poco::JSON::Parser P; auto Obj =
auto Obj = P.parse(IncomingFrame.begin()).extract<Poco::JSON::Object::Ptr>();
P.parse(IncomingFrame.begin()).extract<Poco::JSON::Object::Ptr>(); std::string Answer;
std::string Answer; bool CloseConnection=false;
if (Processor_ != nullptr) if (Processor_ != nullptr) {
Processor_->Processor(Obj, Answer, Done); Processor_->Processor(Obj, Answer, CloseConnection);
if (!Answer.empty()) }
WS_->sendFrame(Answer.c_str(), (int)Answer.size()); if (!Answer.empty())
else { Client->second->WS_->sendFrame(Answer.c_str(), (int)Answer.size());
WS_->sendFrame("{}", 2); else {
} Client->second->WS_->sendFrame("{}", 2);
} catch (const Poco::JSON::JSONException &E) { }
Logger().log(E);
Done=true; if(CloseConnection) {
} return EndConnection(G, Client);
}
} }
} break; } break;
default: { default: {
} }
} }
} catch (...) { } catch (...) {
Done=true; return EndConnection(G, Client);
}
if(Done) {
return EndConnection();
} }
} }
void UI_WebSocketClient::OnSocketShutdown([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf) { void UI_WebSocketClientServer::OnSocketShutdown([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf) {
EndConnection(); ClientList::iterator Client;
std::lock_guard G(LocalMutex_);
try {
Client = FindWSClient(G, pNf->socket().impl()->sockfd());
if (Client == end(Clients_))
return;
EndConnection(G, Client);
} catch (...) {
}
} }
UI_WebSocketClient::UI_WebSocketClient( Poco::Net::WebSocket & WS , const std::string &Id, const std::string &UserName, Poco::Logger & L, UI_WebSocketClientProcessor * Processor) :
Reactor_(UI_WebSocketClientServer()->Reactor()),
Id_(Id),
UserName_(UserName),
Logger_(L),
Processor_(Processor) {
WS_ = std::make_unique<Poco::Net::WebSocket>(WS);
WS_->setNoDelay(true);
WS_->setKeepAlive(true);
WS_->setBlocking(false);
Reactor_.addEventHandler(*WS_,
Poco::NObserver<UI_WebSocketClient, Poco::Net::ReadableNotification>(
*this, &UI_WebSocketClient::OnSocketReadable));
Reactor_.addEventHandler(*WS_,
Poco::NObserver<UI_WebSocketClient, Poco::Net::ShutdownNotification>(
*this, &UI_WebSocketClient::OnSocketShutdown));
Reactor_.addEventHandler(*WS_,
Poco::NObserver<UI_WebSocketClient, Poco::Net::ErrorNotification>(
*this, &UI_WebSocketClient::OnSocketError));
SocketRegistered_ = true;
}
void UI_WebSocketClient::EndConnection() {
if(SocketRegistered_) {
SocketRegistered_ = false;
(*WS_).shutdown();
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<UI_WebSocketClient,
Poco::Net::ReadableNotification>(*this,&UI_WebSocketClient::OnSocketReadable));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<UI_WebSocketClient,
Poco::Net::ShutdownNotification>(*this,&UI_WebSocketClient::OnSocketShutdown));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<UI_WebSocketClient,
Poco::Net::ErrorNotification>(*this,&UI_WebSocketClient::OnSocketError));
UI_WebSocketClientServer()->UnRegister(Id_);
}
}
UI_WebSocketClient::~UI_WebSocketClient() {
EndConnection();
}
[[nodiscard]] const std::string & UI_WebSocketClient::Id() {
return Id_;
};
[[nodiscard]] Poco::Logger & UI_WebSocketClient::Logger() {
return Logger_;
}
[[nodiscard]] bool UI_WebSocketClient::Send(const std::string &Payload) {
try {
WS_->sendFrame(Payload.c_str(),Payload.size());
return true;
} catch (...) {
}
return false;
}
} // namespace OpenWifi } // namespace OpenWifi

View File

@@ -27,6 +27,21 @@ namespace OpenWifi {
private: private:
}; };
struct UI_WebSocketClientInfo {
std::unique_ptr<Poco::Net::WebSocket> WS_ = nullptr;
std::string Id_;
std::string UserName_;
bool Authenticated_ = false;
bool SocketRegistered_=false;
SecurityObjects::UserInfoAndPolicy UserInfo_;
UI_WebSocketClientInfo(Poco::Net::WebSocket &WS, const std::string &Id, const std::string &username) {
WS_ = std::make_unique<Poco::Net::WebSocket>(WS);
Id_ = Id;
UserName_ = username;
}
};
class UI_WebSocketClientServer : public SubSystemServer, Poco::Runnable { class UI_WebSocketClientServer : public SubSystemServer, Poco::Runnable {
public: public:
@@ -41,11 +56,8 @@ namespace OpenWifi {
Poco::Net::SocketReactor & Reactor() { return Reactor_; } Poco::Net::SocketReactor & Reactor() { return Reactor_; }
void NewClient(Poco::Net::WebSocket &WS, const std::string &Id, const std::string &UserName); void NewClient(Poco::Net::WebSocket &WS, const std::string &Id, const std::string &UserName);
void SetProcessor(UI_WebSocketClientProcessor *F); void SetProcessor(UI_WebSocketClientProcessor *F);
void UnRegister(const std::string &Id);
void SetUser(const std::string &Id, const std::string &UserId);
[[nodiscard]] inline bool GeoCodeEnabled() const { return GeoCodeEnabled_; } [[nodiscard]] inline bool GeoCodeEnabled() const { return GeoCodeEnabled_; }
[[nodiscard]] inline std::string GoogleApiKey() const { return GoogleApiKey_; } [[nodiscard]] inline std::string GoogleApiKey() const { return GoogleApiKey_; }
[[nodiscard]] bool Send(const std::string &Id, const std::string &Payload);
template <typename T> bool template <typename T> bool
SendUserNotification(const std::string &userName, const WebSocketNotification<T> &Notification) { SendUserNotification(const std::string &userName, const WebSocketNotification<T> &Notification) {
@@ -70,49 +82,34 @@ namespace OpenWifi {
SendToAll(OO.str()); SendToAll(OO.str());
} }
[[nodiscard]] bool SendToId(const std::string &Id, const std::string &Payload);
[[nodiscard]] bool SendToUser(const std::string &userName, const std::string &Payload); [[nodiscard]] bool SendToUser(const std::string &userName, const std::string &Payload);
void SendToAll(const std::string &Payload); void SendToAll(const std::string &Payload);
private: using ClientList = std::map<int,std::unique_ptr<UI_WebSocketClientInfo>>;
private:
mutable std::atomic_bool Running_ = false; mutable std::atomic_bool Running_ = false;
Poco::Thread Thr_; Poco::Thread Thr_;
Poco::Net::SocketReactor Reactor_; Poco::Net::SocketReactor Reactor_;
Poco::Thread ReactorThread_; Poco::Thread ReactorThread_;
std::recursive_mutex LocalMutex_;
bool GeoCodeEnabled_ = false; bool GeoCodeEnabled_ = false;
std::string GoogleApiKey_; std::string GoogleApiKey_;
std::map<std::string, std::pair<std::unique_ptr<UI_WebSocketClient>, std::string>> Clients_; ClientList Clients_;
UI_WebSocketClientProcessor *Processor_ = nullptr; UI_WebSocketClientProcessor *Processor_ = nullptr;
UI_WebSocketClientServer() noexcept; UI_WebSocketClientServer() noexcept;
void EndConnection(std::lock_guard<std::recursive_mutex> &G, ClientList::iterator & Client);
void OnSocketReadable(const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf);
void OnSocketShutdown(const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf);
void OnSocketError(const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf);
ClientList::iterator FindWSClient( std::lock_guard<std::recursive_mutex> &G, int ClientSocket);
}; };
inline auto UI_WebSocketClientServer() { return UI_WebSocketClientServer::instance(); } inline auto UI_WebSocketClientServer() { return UI_WebSocketClientServer::instance(); }
class UI_WebSocketClient {
public:
explicit UI_WebSocketClient(Poco::Net::WebSocket &WS,
const std::string &Id,
const std::string &UserName,
Poco::Logger &L,
UI_WebSocketClientProcessor *Processor);
virtual ~UI_WebSocketClient();
[[nodiscard]] inline const std::string &Id();
[[nodiscard]] Poco::Logger &Logger();
bool Send(const std::string &Payload);
void EndConnection();
private:
std::unique_ptr<Poco::Net::WebSocket> WS_;
Poco::Net::SocketReactor &Reactor_;
std::string Id_;
std::string UserName_;
Poco::Logger &Logger_;
std::atomic_bool Authenticated_ = false;
volatile bool SocketRegistered_=false;
SecurityObjects::UserInfoAndPolicy UserInfo_;
UI_WebSocketClientProcessor *Processor_ = nullptr;
void OnSocketReadable(const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf);
void OnSocketShutdown(const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf);
void OnSocketError(const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf);
};
}; };