Framework update

This commit is contained in:
stephb9959
2022-04-29 15:22:44 -07:00
parent 3cc000c4c3
commit 83e957fd2e

View File

@@ -73,6 +73,11 @@ using namespace std::chrono_literals;
#include "Poco/SplitterChannel.h"
#include "Poco/JWT/Signer.h"
#include "Poco/DeflatingStream.h"
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/WebSocket.h"
#include "Poco/Environment.h"
#include "Poco/NObserver.h"
#include "Poco/Net/SocketNotification.h"
#include "cppkafka/cppkafka.h"
@@ -104,7 +109,8 @@ namespace OpenWifi {
RATE_LIMIT_EXCEEDED,
BAD_MFA_TRANSACTION,
MFA_FAILURE,
SECURITY_SERVICE_UNREACHABLE
SECURITY_SERVICE_UNREACHABLE,
CANNOT_REFRESH_TOKEN
};
class AppServiceRegistry {
@@ -4560,6 +4566,433 @@ namespace OpenWifi {
}
inline MicroService * MicroService::instance_ = nullptr;
template <typename ContentStruct> struct WebSocketNotification {
inline static uint64_t xid=1;
uint64_t notification_id=++xid;
std::string type;
ContentStruct content;
void to_json(Poco::JSON::Object &Obj) const;
bool from_json(const Poco::JSON::Object::Ptr &Obj);
};
template <typename ContentStruct> void WebSocketNotification<ContentStruct>::to_json(Poco::JSON::Object &Obj) const {
RESTAPI_utils::field_to_json(Obj,"notification_id",notification_id);
RESTAPI_utils::field_to_json(Obj,"type",type);
RESTAPI_utils::field_to_json(Obj,"content",content);
}
template <typename ContentStruct> bool WebSocketNotification<ContentStruct>::from_json(const Poco::JSON::Object::Ptr &Obj) {
try {
RESTAPI_utils::field_from_json(Obj,"notification_id",notification_id);
RESTAPI_utils::field_from_json(Obj,"content",content);
RESTAPI_utils::field_from_json(Obj,"type",type);
return true;
} catch(...) {
}
return false;
}
class WebSocketClientProcessor {
public:
virtual void Processor(const Poco::JSON::Object::Ptr &O, std::string &Answer, bool &Done ) = 0;
private:
};
class MyParallelSocketReactor {
public:
explicit MyParallelSocketReactor(uint32_t NumReactors = 8);
~MyParallelSocketReactor();
Poco::Net::SocketReactor &Reactor();
private:
uint32_t NumReactors_;
Poco::Net::SocketReactor *Reactors_;
Poco::ThreadPool ReactorPool_;
};
class WebSocketClient;
class WebSocketClientServer : public SubSystemServer, Poco::Runnable {
public:
static auto instance() {
static auto instance_ = new WebSocketClientServer;
return instance_;
}
int Start() override;
void Stop() override;
void run() override;
MyParallelSocketReactor &ReactorPool();
void NewClient(Poco::Net::WebSocket &WS, const std::string &Id);
bool Register(WebSocketClient *Client, const std::string &Id);
void SetProcessor(WebSocketClientProcessor *F);
void UnRegister(const std::string &Id);
void SetUser(const std::string &Id, const std::string &UserId);
[[nodiscard]] inline bool GeoCodeEnabled() const { return GeoCodeEnabled_; }
[[nodiscard]] inline std::string GoogleApiKey() const { return GoogleApiKey_; }
[[nodiscard]] bool Send(const std::string &Id, const std::string &Payload);
template <typename T> bool
SendUserNotification(const std::string &userName, const WebSocketNotification<T> &Notification) {
Poco::JSON::Object Payload;
Notification.to_json(Payload);
Poco::JSON::Object Msg;
Msg.set("notification",Payload);
std::ostringstream OO;
Msg.stringify(OO);
return SendToUser(userName,OO.str());
}
template <typename T> void SendNotification(const WebSocketNotification<T> &Notification) {
Poco::JSON::Object Payload;
Notification.to_json(Payload);
Poco::JSON::Object Msg;
Msg.set("notification",Payload);
std::ostringstream OO;
Msg.stringify(OO);
SendToAll(OO.str());
}
[[nodiscard]] bool SendToUser(const std::string &userName, const std::string &Payload);
void SendToAll(const std::string &Payload);
private:
std::atomic_bool Running_ = false;
Poco::Thread Thr_;
std::unique_ptr<MyParallelSocketReactor> ReactorPool_;
bool GeoCodeEnabled_ = false;
std::string GoogleApiKey_;
std::map<std::string, std::pair<WebSocketClient *, std::string>> Clients_;
WebSocketClientProcessor *Processor_ = nullptr;
WebSocketClientServer() noexcept;
};
inline auto WebSocketClientServer() { return WebSocketClientServer::instance(); }
class WebSocketClient {
public:
explicit WebSocketClient(Poco::Net::WebSocket &WS, const std::string &Id, Poco::Logger &L,
WebSocketClientProcessor *Processor);
virtual ~WebSocketClient();
[[nodiscard]] inline const std::string &Id();
[[nodiscard]] Poco::Logger &Logger();
inline bool Send(const std::string &Payload);
private:
std::unique_ptr<Poco::Net::WebSocket> WS_;
Poco::Net::SocketReactor &Reactor_;
std::string Id_;
Poco::Logger &Logger_;
bool Authenticated_ = false;
SecurityObjects::UserInfoAndPolicy UserInfo_;
WebSocketClientProcessor *Processor_ = nullptr;
void OnSocketReadable(const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf);
void OnSocketShutdown(const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf);
void OnSocketError(const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf);
};
inline MyParallelSocketReactor::MyParallelSocketReactor(uint32_t NumReactors) :
NumReactors_(NumReactors)
{
Reactors_ = new Poco::Net::SocketReactor[NumReactors_];
for(uint32_t i=0;i<NumReactors_;i++) {
ReactorPool_.start(Reactors_[i]);
}
}
inline MyParallelSocketReactor::~MyParallelSocketReactor() {
for(uint32_t i=0;i<NumReactors_;i++) {
Reactors_[i].stop();
}
ReactorPool_.stopAll();
ReactorPool_.joinAll();
delete [] Reactors_;
}
inline Poco::Net::SocketReactor & MyParallelSocketReactor::Reactor() {
return Reactors_[ rand() % NumReactors_ ];
}
inline MyParallelSocketReactor & WebSocketClientServer::ReactorPool() { return *ReactorPool_; }
inline void WebSocketClientServer::NewClient(Poco::Net::WebSocket & WS, const std::string &Id) {
std::lock_guard G(Mutex_);
auto Client = new WebSocketClient(WS,Id,Logger(), Processor_);
Clients_[Id] = std::make_pair(Client,"");
}
inline bool WebSocketClientServer::Register( WebSocketClient * Client, const std::string &Id) {
std::lock_guard G(Mutex_);
Clients_[Id] = std::make_pair(Client,"");
return true;
}
inline void WebSocketClientServer::SetProcessor( WebSocketClientProcessor * F) {
Processor_ = F;
}
inline void WebSocketClientServer::UnRegister(const std::string &Id) {
std::lock_guard G(Mutex_);
Clients_.erase(Id);
}
inline void WebSocketClientServer::SetUser(const std::string &Id, const std::string &UserId) {
std::lock_guard G(Mutex_);
auto it=Clients_.find(Id);
if(it!=Clients_.end()) {
Clients_[Id] = std::make_pair(it->second.first,UserId);
}
}
[[nodiscard]] inline bool SendToUser(const std::string &userName, const std::string &Payload);
inline WebSocketClientServer::WebSocketClientServer() noexcept:
SubSystemServer("WebSocketClientServer", "WSCLNT-SVR", "websocketclients")
{
}
inline void WebSocketClientServer::run() {
Running_ = true ;
while(Running_) {
Poco::Thread::trySleep(2000);
if(!Running_)
break;
}
};
inline int WebSocketClientServer::Start() {
GoogleApiKey_ = MicroService::instance().ConfigGetString("google.apikey","");
GeoCodeEnabled_ = !GoogleApiKey_.empty();
ReactorPool_ = std::make_unique<MyParallelSocketReactor>();
Thr_.start(*this);
return 0;
};
inline void WebSocketClientServer::Stop() {
if(Running_) {
Running_ = false;
Thr_.wakeUp();
Thr_.join();
}
};
inline void WebSocketClient::OnSocketError([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf) {
delete this;
}
inline bool WebSocketClientServer::Send(const std::string &Id, const std::string &Payload) {
std::lock_guard G(Mutex_);
auto It = Clients_.find(Id);
if(It!=Clients_.end())
return It->second.first->Send(Payload);
return false;
}
inline bool WebSocketClientServer::SendToUser(const std::string &UserName, const std::string &Payload) {
std::lock_guard G(Mutex_);
uint64_t Sent=0;
for(const auto &client:Clients_) {
if(client.second.second == UserName) {
if(client.second.first->Send(Payload))
Sent++;
}
}
return Sent>0;
}
inline void WebSocketClientServer::SendToAll(const std::string &Payload) {
std::lock_guard G(Mutex_);
for(const auto &client:Clients_) {
try {
client.second.first->Send(Payload);
} catch (...) {
}
}
}
inline void WebSocketClient::OnSocketReadable([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf) {
int flags;
int n;
bool Done=false;
Poco::Buffer<char> IncomingFrame(0);
n = WS_->receiveFrame(IncomingFrame, flags);
auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
if(n==0) {
return delete this;
}
switch(Op) {
case Poco::Net::WebSocket::FRAME_OP_PING: {
WS_->sendFrame("", 0,
(int)Poco::Net::WebSocket::FRAME_OP_PONG |
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
}
break;
case Poco::Net::WebSocket::FRAME_OP_PONG: {
}
break;
case Poco::Net::WebSocket::FRAME_OP_CLOSE: {
Logger().warning(Poco::format("CLOSE(%s): Client is closing its connection.",Id_));
Done=true;
}
break;
case Poco::Net::WebSocket::FRAME_OP_TEXT: {
IncomingFrame.append(0);
if(!Authenticated_) {
std::string Frame{IncomingFrame.begin()};
auto Tokens = Utils::Split(Frame,':');
bool Expired = false, Contacted = false;
if(Tokens.size()==2 && AuthClient()->IsAuthorized(Tokens[1], UserInfo_, Expired, Contacted)) {
Authenticated_=true;
std::string S{"Welcome! Bienvenue! Bienvenidos!"};
WS_->sendFrame(S.c_str(),S.size());
WebSocketClientServer()->SetUser(Id_,UserInfo_.userinfo.email);
} else {
std::string S{"Invalid token. Closing connection."};
WS_->sendFrame(S.c_str(),S.size());
Done=true;
}
} else {
try {
Poco::JSON::Parser P;
auto Obj = P.parse(IncomingFrame.begin())
.extract<Poco::JSON::Object::Ptr>();
std::string Answer;
if(Processor_!= nullptr)
Processor_->Processor(Obj, Answer, Done);
if (!Answer.empty())
WS_->sendFrame(Answer.c_str(), (int) Answer.size());
else {
WS_->sendFrame("{}", 2);
}
} catch (const Poco::JSON::JSONException & E) {
Logger().log(E);
}
}
}
break;
default:
{
}
}
if(Done) {
delete this;
}
}
inline void WebSocketClient::OnSocketShutdown([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf) {
delete this;
}
inline WebSocketClient::WebSocketClient( Poco::Net::WebSocket & WS , const std::string &Id, Poco::Logger & L, WebSocketClientProcessor * Processor) :
Reactor_(WebSocketClientServer()->ReactorPool().Reactor()),
Id_(Id),
Logger_(L),
Processor_(Processor) {
try {
WS_ = std::make_unique<Poco::Net::WebSocket>(WS);
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ReadableNotification>(
*this, &WebSocketClient::OnSocketReadable));
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ShutdownNotification>(
*this, &WebSocketClient::OnSocketShutdown));
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ErrorNotification>(
*this, &WebSocketClient::OnSocketError));
// WebSocketClientServer()->Register(this, Id_);
} catch (...) {
delete this;
}
}
inline WebSocketClient::~WebSocketClient() {
try {
WebSocketClientServer()->UnRegister(Id_);
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ReadableNotification>(*this,&WebSocketClient::OnSocketReadable));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ShutdownNotification>(*this,&WebSocketClient::OnSocketShutdown));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ErrorNotification>(*this,&WebSocketClient::OnSocketError));
(*WS_).shutdown();
(*WS_).close();
WebSocketClientServer()->UnRegister(Id_);
} catch(...) {
}
}
[[nodiscard]] inline const std::string & WebSocketClient::Id() {
return Id_;
};
[[nodiscard]] inline Poco::Logger & WebSocketClient::Logger() {
return Logger_;
}
[[nodiscard]] inline bool WebSocketClient::Send(const std::string &Payload) {
try {
WS_->sendFrame(Payload.c_str(),Payload.size());
return true;
} catch (...) {
}
return false;
}
class RESTAPI_webSocketServer : public RESTAPIHandler {
public:
inline RESTAPI_webSocketServer(const RESTAPIHandler::BindingMap &bindings, Poco::Logger &L, RESTAPI_GenericServer &Server, uint64_t TransactionId, bool Internal)
: RESTAPIHandler(bindings, L,
std::vector<std::string>{ Poco::Net::HTTPRequest::HTTP_GET,
Poco::Net::HTTPRequest::HTTP_OPTIONS},
Server, TransactionId, Internal,false) {}
static auto PathName() { return std::list<std::string>{"/api/v1/ws"};}
void DoGet() final;
void DoDelete() final {};
void DoPost() final {};
void DoPut() final {};
private:
};
inline void RESTAPI_webSocketServer::DoGet() {
try
{
if(Request->find("Upgrade") != Request->end() && Poco::icompare((*Request)["Upgrade"], "websocket") == 0) {
try
{
Poco::Net::WebSocket WS(*Request, *Response);
Logger().information("WebSocket connection established.");
auto Id = MicroService::CreateUUID();
WebSocketClientServer()->NewClient(WS,Id);
}
catch (...) {
std::cout << "Cannot create websocket client..." << std::endl;
}
}
} catch(...) {
std::cout << "Cannot upgrade connection..." << std::endl;
}
}
}
namespace OpenWifi::Utils {