From 83e957fd2e97bfddbaefda8d9045fa35c50bb046 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Fri, 29 Apr 2022 15:22:44 -0700 Subject: [PATCH] Framework update --- src/framework/MicroService.h | 435 ++++++++++++++++++++++++++++++++++- 1 file changed, 434 insertions(+), 1 deletion(-) diff --git a/src/framework/MicroService.h b/src/framework/MicroService.h index 3586bc3..a335276 100644 --- a/src/framework/MicroService.h +++ b/src/framework/MicroService.h @@ -73,6 +73,11 @@ using namespace std::chrono_literals; #include "Poco/SplitterChannel.h" #include "Poco/JWT/Signer.h" #include "Poco/DeflatingStream.h" +#include "Poco/Net/SocketReactor.h" +#include "Poco/Net/WebSocket.h" +#include "Poco/Environment.h" +#include "Poco/NObserver.h" +#include "Poco/Net/SocketNotification.h" #include "cppkafka/cppkafka.h" @@ -104,7 +109,8 @@ namespace OpenWifi { RATE_LIMIT_EXCEEDED, BAD_MFA_TRANSACTION, MFA_FAILURE, - SECURITY_SERVICE_UNREACHABLE + SECURITY_SERVICE_UNREACHABLE, + CANNOT_REFRESH_TOKEN }; class AppServiceRegistry { @@ -4560,6 +4566,433 @@ namespace OpenWifi { } inline MicroService * MicroService::instance_ = nullptr; + + template struct WebSocketNotification { + inline static uint64_t xid=1; + uint64_t notification_id=++xid; + std::string type; + ContentStruct content; + + void to_json(Poco::JSON::Object &Obj) const; + bool from_json(const Poco::JSON::Object::Ptr &Obj); + }; + + template void WebSocketNotification::to_json(Poco::JSON::Object &Obj) const { + RESTAPI_utils::field_to_json(Obj,"notification_id",notification_id); + RESTAPI_utils::field_to_json(Obj,"type",type); + RESTAPI_utils::field_to_json(Obj,"content",content); + } + + template bool WebSocketNotification::from_json(const Poco::JSON::Object::Ptr &Obj) { + try { + RESTAPI_utils::field_from_json(Obj,"notification_id",notification_id); + RESTAPI_utils::field_from_json(Obj,"content",content); + RESTAPI_utils::field_from_json(Obj,"type",type); + return true; + } catch(...) { + + } + return false; + } + + class WebSocketClientProcessor { + public: + virtual void Processor(const Poco::JSON::Object::Ptr &O, std::string &Answer, bool &Done ) = 0; + private: + }; + + class MyParallelSocketReactor { + public: + explicit MyParallelSocketReactor(uint32_t NumReactors = 8); + ~MyParallelSocketReactor(); + Poco::Net::SocketReactor &Reactor(); + private: + uint32_t NumReactors_; + Poco::Net::SocketReactor *Reactors_; + Poco::ThreadPool ReactorPool_; + }; + + class WebSocketClient; + + class WebSocketClientServer : public SubSystemServer, Poco::Runnable { + public: + static auto instance() { + static auto instance_ = new WebSocketClientServer; + return instance_; + } + + int Start() override; + void Stop() override; + void run() override; + MyParallelSocketReactor &ReactorPool(); + void NewClient(Poco::Net::WebSocket &WS, const std::string &Id); + bool Register(WebSocketClient *Client, const std::string &Id); + void SetProcessor(WebSocketClientProcessor *F); + void UnRegister(const std::string &Id); + void SetUser(const std::string &Id, const std::string &UserId); + [[nodiscard]] inline bool GeoCodeEnabled() const { return GeoCodeEnabled_; } + [[nodiscard]] inline std::string GoogleApiKey() const { return GoogleApiKey_; } + [[nodiscard]] bool Send(const std::string &Id, const std::string &Payload); + + template bool + SendUserNotification(const std::string &userName, const WebSocketNotification &Notification) { + + Poco::JSON::Object Payload; + Notification.to_json(Payload); + Poco::JSON::Object Msg; + Msg.set("notification",Payload); + std::ostringstream OO; + Msg.stringify(OO); + + return SendToUser(userName,OO.str()); + } + + template void SendNotification(const WebSocketNotification &Notification) { + Poco::JSON::Object Payload; + Notification.to_json(Payload); + Poco::JSON::Object Msg; + Msg.set("notification",Payload); + std::ostringstream OO; + Msg.stringify(OO); + SendToAll(OO.str()); + } + + [[nodiscard]] bool SendToUser(const std::string &userName, const std::string &Payload); + void SendToAll(const std::string &Payload); + private: + std::atomic_bool Running_ = false; + Poco::Thread Thr_; + std::unique_ptr ReactorPool_; + bool GeoCodeEnabled_ = false; + std::string GoogleApiKey_; + std::map> Clients_; + WebSocketClientProcessor *Processor_ = nullptr; + WebSocketClientServer() noexcept; + }; + + inline auto WebSocketClientServer() { return WebSocketClientServer::instance(); } + + class WebSocketClient { + public: + explicit WebSocketClient(Poco::Net::WebSocket &WS, const std::string &Id, Poco::Logger &L, + WebSocketClientProcessor *Processor); + virtual ~WebSocketClient(); + [[nodiscard]] inline const std::string &Id(); + [[nodiscard]] Poco::Logger &Logger(); + inline bool Send(const std::string &Payload); + private: + std::unique_ptr WS_; + Poco::Net::SocketReactor &Reactor_; + std::string Id_; + Poco::Logger &Logger_; + bool Authenticated_ = false; + SecurityObjects::UserInfoAndPolicy UserInfo_; + WebSocketClientProcessor *Processor_ = nullptr; + void OnSocketReadable(const Poco::AutoPtr &pNf); + void OnSocketShutdown(const Poco::AutoPtr &pNf); + void OnSocketError(const Poco::AutoPtr &pNf); + }; + + inline MyParallelSocketReactor::MyParallelSocketReactor(uint32_t NumReactors) : + NumReactors_(NumReactors) + { + Reactors_ = new Poco::Net::SocketReactor[NumReactors_]; + for(uint32_t i=0;isecond.first,UserId); + } + } + + [[nodiscard]] inline bool SendToUser(const std::string &userName, const std::string &Payload); + inline WebSocketClientServer::WebSocketClientServer() noexcept: + SubSystemServer("WebSocketClientServer", "WSCLNT-SVR", "websocketclients") + { + } + + inline void WebSocketClientServer::run() { + Running_ = true ; + while(Running_) { + Poco::Thread::trySleep(2000); + + if(!Running_) + break; + } + }; + + inline int WebSocketClientServer::Start() { + GoogleApiKey_ = MicroService::instance().ConfigGetString("google.apikey",""); + GeoCodeEnabled_ = !GoogleApiKey_.empty(); + ReactorPool_ = std::make_unique(); + Thr_.start(*this); + return 0; + }; + + inline void WebSocketClientServer::Stop() { + if(Running_) { + Running_ = false; + Thr_.wakeUp(); + Thr_.join(); + } + }; + + + inline void WebSocketClient::OnSocketError([[maybe_unused]] const Poco::AutoPtr &pNf) { + delete this; + } + + inline bool WebSocketClientServer::Send(const std::string &Id, const std::string &Payload) { + std::lock_guard G(Mutex_); + + auto It = Clients_.find(Id); + if(It!=Clients_.end()) + return It->second.first->Send(Payload); + return false; + } + + inline bool WebSocketClientServer::SendToUser(const std::string &UserName, const std::string &Payload) { + std::lock_guard G(Mutex_); + uint64_t Sent=0; + + for(const auto &client:Clients_) { + if(client.second.second == UserName) { + if(client.second.first->Send(Payload)) + Sent++; + } + } + return Sent>0; + } + + inline void WebSocketClientServer::SendToAll(const std::string &Payload) { + std::lock_guard G(Mutex_); + + for(const auto &client:Clients_) { + try { + client.second.first->Send(Payload); + } catch (...) { + + } + } + } + + inline void WebSocketClient::OnSocketReadable([[maybe_unused]] const Poco::AutoPtr &pNf) { + int flags; + int n; + bool Done=false; + Poco::Buffer IncomingFrame(0); + n = WS_->receiveFrame(IncomingFrame, flags); + auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK; + + if(n==0) { + return delete this; + } + + switch(Op) { + case Poco::Net::WebSocket::FRAME_OP_PING: { + WS_->sendFrame("", 0, + (int)Poco::Net::WebSocket::FRAME_OP_PONG | + (int)Poco::Net::WebSocket::FRAME_FLAG_FIN); + } + break; + case Poco::Net::WebSocket::FRAME_OP_PONG: { + } + break; + case Poco::Net::WebSocket::FRAME_OP_CLOSE: { + Logger().warning(Poco::format("CLOSE(%s): Client is closing its connection.",Id_)); + Done=true; + } + break; + case Poco::Net::WebSocket::FRAME_OP_TEXT: { + IncomingFrame.append(0); + if(!Authenticated_) { + std::string Frame{IncomingFrame.begin()}; + auto Tokens = Utils::Split(Frame,':'); + bool Expired = false, Contacted = false; + if(Tokens.size()==2 && AuthClient()->IsAuthorized(Tokens[1], UserInfo_, Expired, Contacted)) { + Authenticated_=true; + std::string S{"Welcome! Bienvenue! Bienvenidos!"}; + WS_->sendFrame(S.c_str(),S.size()); + WebSocketClientServer()->SetUser(Id_,UserInfo_.userinfo.email); + } else { + std::string S{"Invalid token. Closing connection."}; + WS_->sendFrame(S.c_str(),S.size()); + Done=true; + } + + } else { + try { + Poco::JSON::Parser P; + auto Obj = P.parse(IncomingFrame.begin()) + .extract(); + std::string Answer; + if(Processor_!= nullptr) + Processor_->Processor(Obj, Answer, Done); + if (!Answer.empty()) + WS_->sendFrame(Answer.c_str(), (int) Answer.size()); + else { + WS_->sendFrame("{}", 2); + } + } catch (const Poco::JSON::JSONException & E) { + Logger().log(E); + } + } + } + break; + default: + { + + } + } + + if(Done) { + delete this; + } + } + + inline void WebSocketClient::OnSocketShutdown([[maybe_unused]] const Poco::AutoPtr &pNf) { + delete this; + } + + + inline WebSocketClient::WebSocketClient( Poco::Net::WebSocket & WS , const std::string &Id, Poco::Logger & L, WebSocketClientProcessor * Processor) : + Reactor_(WebSocketClientServer()->ReactorPool().Reactor()), + Id_(Id), + Logger_(L), + Processor_(Processor) { + try { + WS_ = std::make_unique(WS); + Reactor_.addEventHandler(*WS_, + Poco::NObserver( + *this, &WebSocketClient::OnSocketReadable)); + Reactor_.addEventHandler(*WS_, + Poco::NObserver( + *this, &WebSocketClient::OnSocketShutdown)); + Reactor_.addEventHandler(*WS_, + Poco::NObserver( + *this, &WebSocketClient::OnSocketError)); + // WebSocketClientServer()->Register(this, Id_); + } catch (...) { + delete this; + } + } + + inline WebSocketClient::~WebSocketClient() { + try { + WebSocketClientServer()->UnRegister(Id_); + Reactor_.removeEventHandler(*WS_, + Poco::NObserver(*this,&WebSocketClient::OnSocketReadable)); + Reactor_.removeEventHandler(*WS_, + Poco::NObserver(*this,&WebSocketClient::OnSocketShutdown)); + Reactor_.removeEventHandler(*WS_, + Poco::NObserver(*this,&WebSocketClient::OnSocketError)); + (*WS_).shutdown(); + (*WS_).close(); + WebSocketClientServer()->UnRegister(Id_); + } catch(...) { + + } + } + + [[nodiscard]] inline const std::string & WebSocketClient::Id() { + return Id_; + }; + + [[nodiscard]] inline Poco::Logger & WebSocketClient::Logger() { + return Logger_; + } + + [[nodiscard]] inline bool WebSocketClient::Send(const std::string &Payload) { + try { + WS_->sendFrame(Payload.c_str(),Payload.size()); + return true; + } catch (...) { + + } + return false; + } + + class RESTAPI_webSocketServer : public RESTAPIHandler { + public: + inline RESTAPI_webSocketServer(const RESTAPIHandler::BindingMap &bindings, Poco::Logger &L, RESTAPI_GenericServer &Server, uint64_t TransactionId, bool Internal) + : RESTAPIHandler(bindings, L, + std::vector{ Poco::Net::HTTPRequest::HTTP_GET, + Poco::Net::HTTPRequest::HTTP_OPTIONS}, + Server, TransactionId, Internal,false) {} + static auto PathName() { return std::list{"/api/v1/ws"};} + void DoGet() final; + void DoDelete() final {}; + void DoPost() final {}; + void DoPut() final {}; + private: + }; + + inline void RESTAPI_webSocketServer::DoGet() { + try + { + if(Request->find("Upgrade") != Request->end() && Poco::icompare((*Request)["Upgrade"], "websocket") == 0) { + try + { + Poco::Net::WebSocket WS(*Request, *Response); + Logger().information("WebSocket connection established."); + auto Id = MicroService::CreateUUID(); + WebSocketClientServer()->NewClient(WS,Id); + } + catch (...) { + std::cout << "Cannot create websocket client..." << std::endl; + } + } + } catch(...) { + std::cout << "Cannot upgrade connection..." << std::endl; + } + } + + } namespace OpenWifi::Utils {