Adding WebSocketServer to Framework

This commit is contained in:
stephb9959
2022-04-28 12:05:24 -07:00
parent 59c1f3afbb
commit 8de57f525c
15 changed files with 655 additions and 695 deletions

View File

@@ -102,7 +102,6 @@ add_executable(owprov
src/RESTAPI/RESTAPI_inventory_list_handler.cpp src/RESTAPI/RESTAPI_inventory_list_handler.h
src/RESTAPI/RESTAPI_entity_list_handler.cpp src/RESTAPI/RESTAPI_entity_list_handler.h
src/RESTAPI/RESTAPI_configurations_handler.cpp src/RESTAPI/RESTAPI_configurations_handler.h
src/RESTAPI/RESTAPI_webSocketServer.h src/RESTAPI/RESTAPI_webSocketServer.cpp
src/RESTAPI/RESTAPI_contact_list_handler.cpp src/RESTAPI/RESTAPI_contact_list_handler.h
src/RESTAPI/RESTAPI_location_list_handler.cpp src/RESTAPI/RESTAPI_location_list_handler.h
src/RESTAPI/RESTAPI_venue_list_handler.cpp src/RESTAPI/RESTAPI_venue_list_handler.h
@@ -126,7 +125,6 @@ add_executable(owprov
src/JobController.cpp src/JobController.h
src/JobRegistrations.cpp
src/storage/storage_jobs.cpp src/storage/storage_jobs.h
src/WebSocketClientServer.cpp src/WebSocketClientServer.h
src/storage/storage_maps.cpp src/storage/storage_maps.h
src/RESTAPI/RESTAPI_map_handler.cpp src/RESTAPI/RESTAPI_map_handler.h
src/RESTAPI/RESTAPI_map_list_handler.cpp src/RESTAPI/RESTAPI_map_list_handler.h
@@ -136,7 +134,7 @@ add_executable(owprov
src/storage/storage_variables.cpp src/storage/storage_variables.h
src/RESTAPI/RESTAPI_variables_handler.cpp src/RESTAPI/RESTAPI_variables_handler.h
src/RESTAPI/RESTAPI_variables_list_handler.cpp src/RESTAPI/RESTAPI_variables_list_handler.h
src/FileDownloader.cpp src/FileDownloader.h src/Tasks/VenueConfigUpdater.cpp src/Tasks/VenueConfigUpdater.h src/Kafka_ProvUpdater.cpp src/Kafka_ProvUpdater.h src/storage/storage_operataor.cpp src/storage/storage_operataor.h src/storage/storage_sub_devices.cpp src/storage/storage_sub_devices.h src/storage/storage_service_class.cpp src/storage/storage_service_class.h src/RESTAPI/RESTAPI_sub_devices_list_handler.cpp src/RESTAPI/RESTAPI_sub_devices_list_handler.h src/RESTAPI/RESTAPI_sub_devices_handler.cpp src/RESTAPI/RESTAPI_sub_devices_handler.h src/RESTAPI/RESTAPI_service_class_list_handler.cpp src/RESTAPI/RESTAPI_service_class_list_handler.h src/RESTAPI/RESTAPI_service_class_handler.cpp src/RESTAPI/RESTAPI_service_class_handler.h src/RESTAPI/RESTAPI_operators_list_handler.cpp src/RESTAPI/RESTAPI_operators_list_handler.h src/RESTAPI/RESTAPI_operators_handler.cpp src/RESTAPI/RESTAPI_operators_handler.h src/storage/storage_op_contacts.cpp src/storage/storage_op_contacts.h src/storage/storage_op_locations.cpp src/storage/storage_op_locations.h src/RESTAPI/RESTAPI_op_contact_list_handler.cpp src/RESTAPI/RESTAPI_op_contact_list_handler.h src/RESTAPI/RESTAPI_op_contact_handler.cpp src/RESTAPI/RESTAPI_op_contact_handler.h src/RESTAPI/RESTAPI_op_location_list_handler.cpp src/RESTAPI/RESTAPI_op_location_list_handler.h src/RESTAPI/RESTAPI_op_location_handler.cpp src/RESTAPI/RESTAPI_op_location_handler.h)
src/FileDownloader.cpp src/FileDownloader.h src/Tasks/VenueConfigUpdater.cpp src/Tasks/VenueConfigUpdater.h src/Kafka_ProvUpdater.cpp src/Kafka_ProvUpdater.h src/storage/storage_operataor.cpp src/storage/storage_operataor.h src/storage/storage_sub_devices.cpp src/storage/storage_sub_devices.h src/storage/storage_service_class.cpp src/storage/storage_service_class.h src/RESTAPI/RESTAPI_sub_devices_list_handler.cpp src/RESTAPI/RESTAPI_sub_devices_list_handler.h src/RESTAPI/RESTAPI_sub_devices_handler.cpp src/RESTAPI/RESTAPI_sub_devices_handler.h src/RESTAPI/RESTAPI_service_class_list_handler.cpp src/RESTAPI/RESTAPI_service_class_list_handler.h src/RESTAPI/RESTAPI_service_class_handler.cpp src/RESTAPI/RESTAPI_service_class_handler.h src/RESTAPI/RESTAPI_operators_list_handler.cpp src/RESTAPI/RESTAPI_operators_list_handler.h src/RESTAPI/RESTAPI_operators_handler.cpp src/RESTAPI/RESTAPI_operators_handler.h src/storage/storage_op_contacts.cpp src/storage/storage_op_contacts.h src/storage/storage_op_locations.cpp src/storage/storage_op_locations.h src/RESTAPI/RESTAPI_op_contact_list_handler.cpp src/RESTAPI/RESTAPI_op_contact_list_handler.h src/RESTAPI/RESTAPI_op_contact_handler.cpp src/RESTAPI/RESTAPI_op_contact_handler.h src/RESTAPI/RESTAPI_op_location_list_handler.cpp src/RESTAPI/RESTAPI_op_location_list_handler.h src/RESTAPI/RESTAPI_op_location_handler.cpp src/RESTAPI/RESTAPI_op_location_handler.h src/ProvWebSocketClient.cpp src/ProvWebSocketClient.h)
target_link_libraries(owprov PUBLIC
${Poco_LIBRARIES}

2
build
View File

@@ -1 +1 @@
101
106

View File

@@ -16,7 +16,6 @@
#include "framework/ConfigurationValidator.h"
#include "SerialNumberCache.h"
#include "JobController.h"
#include "WebSocketClientServer.h"
#include "FindCountry.h"
#include "Signup.h"
#include "DeviceTypeCache.h"
@@ -59,6 +58,8 @@ namespace OpenWifi {
FWRules_ = ProvObjects::dont_upgrade;
}
WebSocketProcessor_ = std::make_unique<ProvWebSocketClient>(logger());
AssetDir_ = MicroService::instance().DataDir() + "/wwwassets";
Poco::File DataDir(AssetDir_);
if(!DataDir.exists()) {

View File

@@ -18,6 +18,7 @@
#include "framework/MicroService.h"
#include "framework/OpenWifiTypes.h"
#include "RESTObjects/RESTAPI_ProvObjects.h"
#include "ProvWebSocketClient.h"
namespace OpenWifi {
@@ -49,6 +50,7 @@ namespace OpenWifi {
OpenWifi::ProvisioningDashboard DB_{};
ProvObjects::FIRMWARE_UPGRADE_RULES FWRules_{ProvObjects::dont_upgrade};
std::string AssetDir_;
std::unique_ptr<ProvWebSocketClient> WebSocketProcessor_;
};
inline Daemon * Daemon() { return Daemon::instance(); }

165
src/ProvWebSocketClient.cpp Normal file
View File

@@ -0,0 +1,165 @@
//
// Created by stephane bourque on 2022-04-28.
//
#include "ProvWebSocketClient.h"
#include "StorageService.h"
#include "SerialNumberCache.h"
#include "sdks/SDK_sec.h"
namespace OpenWifi {
ProvWebSocketClient::ProvWebSocketClient(Poco::Logger &Logger) :
Logger_(Logger){
WebSocketClientServer()->SetProcessor(this);
}
ProvWebSocketClient::~ProvWebSocketClient() {
WebSocketClientServer()->SetProcessor(nullptr);
}
void ProvWebSocketClient::ws_command_serial_number_search(const Poco::JSON::Object::Ptr &O,
bool &Done, std::string &Answer) {
Done = false;
auto Prefix = O->get("serial_prefix").toString();
Logger().information(Poco::format("serial_number_search: %s", Prefix));
if (!Prefix.empty() && Prefix.length() < 13) {
std::vector<uint64_t> Numbers;
SerialNumberCache()->FindNumbers(Prefix, 50, Numbers);
Poco::JSON::Array A;
for (const auto &i : Numbers)
A.add(Utils::int_to_hex(i));
Poco::JSON::Object A0;
A0.set("serialNumbers", A0);
std::ostringstream SS;
Poco::JSON::Stringifier::stringify(A, SS);
Answer = SS.str();
}
}
void ProvWebSocketClient::ws_command_address_completion(const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = false;
auto Address = O->get("address").toString();
Answer = GoogleGeoCodeCall(Address);
}
void ProvWebSocketClient::ws_command_exit([[maybe_unused]] const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = true;
Answer = R"lit({ "closing" : "Goodbye! Aurevoir! Hasta la vista!" })lit";
}
void ProvWebSocketClient::ws_command_invalid([[maybe_unused]] const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = false;
Answer = std::string{R"lit({ "error" : "invalid command" })lit"};
}
void ProvWebSocketClient::ws_command_subuser_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = false;
auto operatorId = O->get("operatorId").toString();
std::string nameSearch, emailSearch;
OpenWifi::RESTAPIHandler::AssignIfPresent(O,"nameSearch",nameSearch);
OpenWifi::RESTAPIHandler::AssignIfPresent(O,"emailSearch",emailSearch);
SecurityObjects::UserInfoList Users;
SDK::Sec::Subscriber::Search(nullptr,operatorId,nameSearch,emailSearch,Users);
Poco::JSON::Array Arr;
for(const auto &i:Users.users) {
Poco::JSON::Object OO;
OO.set("name", i.name);
OO.set("email", i.email);
OO.set("id", i.id);
i.to_json(OO);
Arr.add(OO);
}
Poco::JSON::Object ObjAnswer;
ObjAnswer.set("users", Arr);
std::ostringstream SS;
Poco::JSON::Stringifier::stringify(ObjAnswer, SS);
Answer = SS.str();
}
void ProvWebSocketClient::ws_command_subdevice_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = false;
auto operatorId = O->get("operatorId").toString();
auto Prefix = O->get("serial_prefix").toString();
std::string Query;
if(Prefix[0]=='*') {
Query = fmt::format(" operatorId='{}' and (right(serialNumber,{})='{}' or right(realMacAddress,{})='{}' ) ",
operatorId, Prefix.size()-1, Prefix.substr(1), Prefix.size()-1, Prefix.substr(1));
} else {
Query = fmt::format(" operatorId='{}' and (left(serialNumber,{})='{}' or left(realMacAddress,{})='{}' ) ",
operatorId, Prefix.size(), Prefix, Prefix.size(), Prefix);
}
std::vector<ProvObjects::SubscriberDevice> SubDevices;
StorageService()->SubscriberDeviceDB().GetRecords(0,200,SubDevices,Query);
Poco::JSON::Array Arr;
for(const auto &i:SubDevices) {
Arr.add(i.serialNumber);
}
Poco::JSON::Object RetObj;
RetObj.set("serialNumbers", Arr);
std::ostringstream SS;
Poco::JSON::Stringifier::stringify(RetObj, SS);
Answer = SS.str();
}
void ProvWebSocketClient::Processor(const Poco::JSON::Object::Ptr &O, std::string &Result, bool &Done ) {
try {
if (O->has("command") && O->has("id")) {
auto id = (uint64_t) O->get("id");
std::string Answer;
auto Command = O->get("command").toString();
if (Command == "serial_number_search" && O->has("serial_prefix")) {
ws_command_serial_number_search(O,Done,Answer);
} else if (WebSocketClientServer()->GeoCodeEnabled() && Command == "address_completion" && O->has("address")) {
ws_command_address_completion(O,Done,Answer);
} else if (WebSocketClientServer()->GeoCodeEnabled() && Command == "subuser_search" && O->has("operatorId")) {
ws_command_subuser_search(O,Done,Answer);
} else if (WebSocketClientServer()->GeoCodeEnabled() && Command == "subdevice_search" && O->has("operatorId") && O->has("serial_prefix")) {
ws_command_subdevice_search(O,Done,Answer);
} else if (Command=="exit") {
ws_command_exit(O,Done,Answer);
} else {
ws_command_invalid(O,Done,Answer);
}
Result = fmt::format("{{ \"command_response_id\" : {} , \"response\" : {} }}" , id, Answer);
}
} catch (const Poco::Exception &E) {
Logger().log(E);
}
}
std::string ProvWebSocketClient::GoogleGeoCodeCall(const std::string &A) {
try {
std::string URI = { "https://maps.googleapis.com/maps/api/geocode/json"};
Poco::URI uri(URI);
uri.addQueryParameter("address",A);
uri.addQueryParameter("key", WebSocketClientServer()->GoogleApiKey());
Poco::Net::HTTPSClientSession session(uri.getHost(), uri.getPort());
Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_GET, uri.getPathAndQuery(), Poco::Net::HTTPMessage::HTTP_1_1);
session.sendRequest(req);
Poco::Net::HTTPResponse res;
std::istream& rs = session.receiveResponse(res);
if(res.getStatus()==Poco::Net::HTTPResponse::HTTP_OK) {
std::ostringstream os;
Poco::StreamCopier::copyStream(rs,os);
return os.str();
} else {
std::ostringstream os;
Poco::StreamCopier::copyStream(rs,os);
return R"lit({ "error: )lit" + os.str() + R"lit( })lit";
}
} catch(...) {
}
return "{ \"error\" : \"No call made\" }";
}
}

28
src/ProvWebSocketClient.h Normal file
View File

@@ -0,0 +1,28 @@
//
// Created by stephane bourque on 2022-04-28.
//
#pragma once
#include "framework/MicroService.h"
namespace OpenWifi {
class ProvWebSocketClient : public WebSocketClientProcessor {
public:
explicit ProvWebSocketClient(Poco::Logger &Logger);
virtual ~ProvWebSocketClient();
virtual void Processor(const Poco::JSON::Object::Ptr &O, std::string &Answer, bool &Done );
void ws_command_serial_number_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_address_completion( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_exit( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_invalid( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_subuser_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_subdevice_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
std::string GoogleGeoCodeCall(const std::string &A);
private:
Poco::Logger & Logger_;
inline Poco::Logger & Logger() { return Logger_; }
};
}

View File

@@ -15,7 +15,6 @@
#include "RESTAPI/RESTAPI_entity_list_handler.h"
#include "RESTAPI/RESTAPI_configurations_handler.h"
#include "RESTAPI/RESTAPI_configurations_list_handler.h"
#include "RESTAPI/RESTAPI_webSocketServer.h"
#include "RESTAPI/RESTAPI_contact_list_handler.h"
#include "RESTAPI/RESTAPI_location_list_handler.h"
#include "RESTAPI/RESTAPI_venue_list_handler.h"

View File

@@ -1,104 +0,0 @@
//
// Created by stephane bourque on 2021-08-12.
//
#include "RESTAPI_webSocketServer.h"
#include "framework/MicroService.h"
#include "Poco/Net/WebSocket.h"
#include "Poco/Net/NetException.h"
#include "Poco/Net/HTTPResponse.h"
#include "Poco/JSON/Object.h"
#include "Poco/JSON/Stringifier.h"
#include "Poco/Net/HTTPSClientSession.h"
#include "SerialNumberCache.h"
#include "WebSocketClientServer.h"
namespace OpenWifi {
void RESTAPI_webSocketServer::DoGet() {
try
{
if(Request->find("Upgrade") != Request->end() && Poco::icompare((*Request)["Upgrade"], "websocket") == 0) {
try
{
Poco::Net::WebSocket WS(*Request, *Response);
Logger().information("WebSocket connection established.");
auto Id = MicroService::CreateUUID();
new WebSocketClient(WS,Id,Logger());
}
catch (...) {
std::cout << "Cannot create websocket client..." << std::endl;
}
}
} catch(...) {
std::cout << "Cannot upgrade connection..." << std::endl;
}
}
void RESTAPI_webSocketServer::Process(const Poco::JSON::Object::Ptr &O, std::string &Answer, bool &Done ) {
try {
if (O->has("command")) {
auto Command = O->get("command").toString();
if (Command == "serial_number_search" && O->has("serial_prefix")) {
auto Prefix = O->get("serial_prefix").toString();
Logger().information(Poco::format("serial_number_search: %s", Prefix));
if (!Prefix.empty() && Prefix.length() < 13) {
std::vector<uint64_t> Numbers;
SerialNumberCache()->FindNumbers(Prefix, 50, Numbers);
Poco::JSON::Array A;
for (const auto &i : Numbers)
A.add(Utils::int_to_hex(i));
Poco::JSON::Object AO;
AO.set("serialNumbers", A);
AO.set("command","serial_number_search");
std::ostringstream SS;
Poco::JSON::Stringifier::stringify(AO, SS);
Answer = SS.str();
}
} else if (GeoCodeEnabled_ && Command == "address_completion" && O->has("address")) {
auto Address = O->get("address").toString();
Answer = GoogleGeoCodeCall(Address);
} else if (Command=="exit") {
Answer = R"lit({ "closing" : "Goodbye! Aurevoir! Hasta la vista!" })lit";
Done = true;
} else {
Answer = std::string{R"lit({ "error" : "invalid command" })lit"};
}
}
} catch (const Poco::Exception &E) {
Logger().log(E);
}
}
std::string RESTAPI_webSocketServer::GoogleGeoCodeCall(const std::string &A) {
try {
std::string URI = { "https://maps.googleapis.com/maps/api/geocode/json"};
Poco::URI uri(URI);
uri.addQueryParameter("address",A);
uri.addQueryParameter("key", GoogleApiKey_);
Poco::Net::HTTPSClientSession session(uri.getHost(), uri.getPort());
Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_GET, uri.getPathAndQuery(), Poco::Net::HTTPMessage::HTTP_1_1);
session.sendRequest(req);
Poco::Net::HTTPResponse res;
std::istream& rs = session.receiveResponse(res);
if(res.getStatus()==Poco::Net::HTTPResponse::HTTP_OK) {
std::ostringstream os;
Poco::StreamCopier::copyStream(rs,os);
return os.str();
} else {
std::ostringstream os;
Poco::StreamCopier::copyStream(rs,os);
return R"lit({ "error: )lit" + os.str() + R"lit( })lit";
}
} catch(...) {
}
return "{ \"error\" : \"No call made\" }";
}
}

View File

@@ -1,29 +0,0 @@
//
// Created by stephane bourque on 2021-08-12.
//
#pragma once
#include "framework/MicroService.h"
namespace OpenWifi {
class RESTAPI_webSocketServer : public RESTAPIHandler {
public:
RESTAPI_webSocketServer(const RESTAPIHandler::BindingMap &bindings, Poco::Logger &L, RESTAPI_GenericServer &Server, uint64_t TransactionId, bool Internal)
: RESTAPIHandler(bindings, L,
std::vector<std::string>{ Poco::Net::HTTPRequest::HTTP_GET,
Poco::Net::HTTPRequest::HTTP_OPTIONS},
Server, TransactionId, Internal,false) {}
static auto PathName() { return std::list<std::string>{"/api/v1/ws"};}
void DoGet() final;
void DoDelete() final {};
void DoPost() final {};
void DoPut() final {};
private:
void Process(const Poco::JSON::Object::Ptr &O, std::string &Answer, bool &Done);
std::string GoogleGeoCodeCall(const std::string &A);
bool GeoCodeEnabled_=false;
std::string GoogleApiKey_;
};
}

View File

@@ -996,14 +996,14 @@ namespace OpenWifi::ProvObjects {
return false;
}
void CompleteDeviceConfiguration::to_json(Poco::JSON::Object &Obj) const {
void ConfigurationDetails::to_json(Poco::JSON::Object &Obj) const {
field_to_json( Obj,"configuration", configuration);
field_to_json( Obj,"rrm", rrm);
field_to_json( Obj,"firmwareRCOnly", firmwareRCOnly);
field_to_json( Obj,"firmwareUpgrade", firmwareUpgrade);
}
bool CompleteDeviceConfiguration::from_json(const Poco::JSON::Object::Ptr &Obj) {
bool ConfigurationDetails::from_json(const Poco::JSON::Object::Ptr &Obj) {
try {
field_from_json( Obj,"configuration", configuration);
field_from_json( Obj,"rrm", rrm);
@@ -1153,47 +1153,5 @@ namespace OpenWifi::ProvObjects {
return true;
}
void WebSocketNotificationContent::to_json(Poco::JSON::Object &Obj) const {
field_to_json(Obj,"title",title);
field_to_json(Obj,"type",type);
field_to_json(Obj,"success",success);
field_to_json(Obj,"error",error);
field_to_json(Obj,"warning",warning);
field_to_json(Obj,"timeStamp",timeStamp);
field_to_json(Obj,"details",details);
}
bool WebSocketNotificationContent::from_json(const Poco::JSON::Object::Ptr &Obj) {
try {
field_from_json(Obj,"title",title);
field_from_json(Obj,"type",type);
field_from_json(Obj,"success",success);
field_from_json(Obj,"error",error);
field_from_json(Obj,"warning",warning);
field_from_json(Obj,"timeStamp",timeStamp);
field_from_json(Obj,"details",details);
return true;
} catch(...) {
}
return false;
}
void WebSocketNotification::to_json(Poco::JSON::Object &Obj) const {
field_to_json(Obj,"notification_id",notification_id);
field_to_json(Obj,"content",content);
}
bool WebSocketNotification::from_json(const Poco::JSON::Object::Ptr &Obj) {
try {
field_from_json(Obj,"notification_id",notification_id);
field_from_json(Obj,"content",content);
return true;
} catch(...) {
}
return false;
}
}

View File

@@ -633,7 +633,7 @@ namespace OpenWifi::ProvObjects {
bool from_json(const Poco::JSON::Object::Ptr &Obj);
};
struct CompleteDeviceConfiguration {
struct ConfigurationDetails {
DeviceConfigurationElementVec configuration;
std::string rrm{"inherit"};
std::string firmwareUpgrade{"inherit"};
@@ -676,28 +676,6 @@ namespace OpenWifi::ProvObjects {
bool from_json(const Poco::JSON::Object::Ptr &Obj);
};
struct WebSocketNotificationContent {
std::string title,
type,
details;
std::vector<std::string> success,
error,
warning;
uint64_t timeStamp=std::time(nullptr);
void to_json(Poco::JSON::Object &Obj) const;
bool from_json(const Poco::JSON::Object::Ptr &Obj);
};
struct WebSocketNotification {
inline static uint64_t xid=1;
uint64_t notification_id=++xid;
WebSocketNotificationContent content;
void to_json(Poco::JSON::Object &Obj) const;
bool from_json(const Poco::JSON::Object::Ptr &Obj);
};
bool UpdateObjectInfo(const Poco::JSON::Object::Ptr &O, const SecurityObjects::UserInfo &U, ObjectInfo &I);
bool CreateObjectInfo(const Poco::JSON::Object::Ptr &O, const SecurityObjects::UserInfo &U, ObjectInfo &I);
bool CreateObjectInfo(const SecurityObjects::UserInfo &U, ObjectInfo &I);

View File

@@ -8,7 +8,6 @@
#include "StorageService.h"
#include "APConfig.h"
#include "sdks/SDK_gw.h"
#include "WebSocketClientServer.h"
namespace OpenWifi {
@@ -113,7 +112,7 @@ namespace OpenWifi {
if(When_ && When_>OpenWifi::Now())
Poco::Thread::trySleep( (long) (When_ - OpenWifi::Now()) * 1000 );
ProvObjects::WebSocketNotification N;
WebSocketNotification N;
N.content.type = "venue_configuration_update";
Logger().information(fmt::format("Job {} Starting.", JobId_));

View File

@@ -1,300 +0,0 @@
//
// Created by stephane bourque on 2021-11-01.
//
#include "framework/MicroService.h"
#include "WebSocketClientServer.h"
#include "SerialNumberCache.h"
#include "StorageService.h"
#include "sdks/SDK_sec.h"
namespace OpenWifi {
void WebSocketClientServer::run() {
Running_ = true ;
while(Running_) {
Poco::Thread::trySleep(2000);
if(!Running_)
break;
}
};
int WebSocketClientServer::Start() {
GoogleApiKey_ = MicroService::instance().ConfigGetString("google.apikey","");
GeoCodeEnabled_ = !GoogleApiKey_.empty();
ReactorPool_ = std::make_unique<MyParallelSocketReactor>();
Thr_.start(*this);
return 0;
};
void WebSocketClientServer::Stop() {
if(Running_) {
Running_ = false;
Thr_.wakeUp();
Thr_.join();
}
};
bool WebSocketClientServer::SendUserNotification(const std::string &userName,
const ProvObjects::WebSocketNotification &Notification) {
Poco::JSON::Object Payload;
Notification.to_json(Payload);
Poco::JSON::Object Msg;
Msg.set("notification",Payload);
std::ostringstream OO;
Msg.stringify(OO);
// std::cout << std::endl << OO.str() << std::endl;
return SendToUser(userName,OO.str());
}
void WebSocketClient::OnSocketError([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf) {
delete this;
}
bool WebSocketClientServer::Send(const std::string &Id, const std::string &Payload) {
std::lock_guard G(Mutex_);
auto It = Clients_.find(Id);
if(It!=Clients_.end())
return It->second.first->Send(Payload);
return false;
}
bool WebSocketClientServer::SendToUser(const std::string &UserName, const std::string &Payload) {
std::lock_guard G(Mutex_);
uint64_t Sent=0;
for(const auto &client:Clients_) {
if(client.second.second == UserName) {
if(client.second.first->Send(Payload))
Sent++;
}
}
return Sent>0;
}
void WebSocketClient::OnSocketReadable([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf) {
int flags;
int n;
bool Done=false;
Poco::Buffer<char> IncomingFrame(0);
n = WS_->receiveFrame(IncomingFrame, flags);
auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
if(n==0) {
return delete this;
}
switch(Op) {
case Poco::Net::WebSocket::FRAME_OP_PING: {
WS_->sendFrame("", 0,
(int)Poco::Net::WebSocket::FRAME_OP_PONG |
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
}
break;
case Poco::Net::WebSocket::FRAME_OP_PONG: {
}
break;
case Poco::Net::WebSocket::FRAME_OP_CLOSE: {
Logger().warning(Poco::format("CLOSE(%s): Client is closing its connection.",Id_));
Done=true;
}
break;
case Poco::Net::WebSocket::FRAME_OP_TEXT: {
IncomingFrame.append(0);
if(!Authenticated_) {
std::string Frame{IncomingFrame.begin()};
auto Tokens = Utils::Split(Frame,':');
bool Expired = false, Contacted = false;
if(Tokens.size()==2 && AuthClient()->IsAuthorized(Tokens[1], UserInfo_, Expired, Contacted)) {
Authenticated_=true;
std::string S{"Welcome! Bienvenue! Bienvenidos!"};
WS_->sendFrame(S.c_str(),S.size());
WebSocketClientServer()->SetUser(Id_,UserInfo_.userinfo.email);
} else {
std::string S{"Invalid token. Closing connection."};
WS_->sendFrame(S.c_str(),S.size());
Done=true;
}
} else {
try {
Poco::JSON::Parser P;
auto Obj = P.parse(IncomingFrame.begin())
.extract<Poco::JSON::Object::Ptr>();
std::string Answer;
Process(Obj, Answer, Done );
if (!Answer.empty())
WS_->sendFrame(Answer.c_str(), (int) Answer.size());
else {
WS_->sendFrame("{}", 2);
}
} catch (const Poco::JSON::JSONException & E) {
Logger().log(E);
}
}
}
break;
default:
{
}
}
if(Done) {
delete this;
}
}
void WebSocketClient::OnSocketShutdown([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf) {
delete this;
}
void WebSocketClient::ws_command_serial_number_search(const Poco::JSON::Object::Ptr &O,
bool &Done, std::string &Answer) {
Done = false;
auto Prefix = O->get("serial_prefix").toString();
Logger().information(Poco::format("serial_number_search: %s", Prefix));
if (!Prefix.empty() && Prefix.length() < 13) {
std::vector<uint64_t> Numbers;
SerialNumberCache()->FindNumbers(Prefix, 50, Numbers);
Poco::JSON::Array A;
for (const auto &i : Numbers)
A.add(Utils::int_to_hex(i));
Poco::JSON::Object A0;
A0.set("serialNumbers", A0);
std::ostringstream SS;
Poco::JSON::Stringifier::stringify(A, SS);
Answer = SS.str();
}
}
void
WebSocketClient::ws_command_address_completion(const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = false;
auto Address = O->get("address").toString();
Answer = GoogleGeoCodeCall(Address);
}
void WebSocketClient::ws_command_exit([[maybe_unused]] const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = true;
Answer = R"lit({ "closing" : "Goodbye! Aurevoir! Hasta la vista!" })lit";
}
void WebSocketClient::ws_command_invalid([[maybe_unused]] const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = false;
Answer = std::string{R"lit({ "error" : "invalid command" })lit"};
}
void WebSocketClient::ws_command_subuser_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = false;
auto operatorId = O->get("operatorId").toString();
std::string nameSearch, emailSearch;
OpenWifi::RESTAPIHandler::AssignIfPresent(O,"nameSearch",nameSearch);
OpenWifi::RESTAPIHandler::AssignIfPresent(O,"emailSearch",emailSearch);
SecurityObjects::UserInfoList Users;
SDK::Sec::Subscriber::Search(nullptr,operatorId,nameSearch,emailSearch,Users);
Poco::JSON::Array Arr;
for(const auto &i:Users.users) {
Poco::JSON::Object OO;
OO.set("name", i.name);
OO.set("email", i.email);
OO.set("id", i.id);
i.to_json(OO);
Arr.add(OO);
}
Poco::JSON::Object ObjAnswer;
ObjAnswer.set("users", Arr);
std::ostringstream SS;
Poco::JSON::Stringifier::stringify(ObjAnswer, SS);
Answer = SS.str();
}
void WebSocketClient::ws_command_subdevice_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer) {
Done = false;
auto operatorId = O->get("operatorId").toString();
auto Prefix = O->get("serial_prefix").toString();
std::string Query;
if(Prefix[0]=='*') {
Query = fmt::format(" operatorId='{}' and (right(serialNumber,{})='{}' or right(realMacAddress,{})='{}' ) ",
operatorId, Prefix.size()-1, Prefix.substr(1), Prefix.size()-1, Prefix.substr(1));
} else {
Query = fmt::format(" operatorId='{}' and (left(serialNumber,{})='{}' or left(realMacAddress,{})='{}' ) ",
operatorId, Prefix.size(), Prefix, Prefix.size(), Prefix);
}
std::vector<ProvObjects::SubscriberDevice> SubDevices;
StorageService()->SubscriberDeviceDB().GetRecords(0,200,SubDevices,Query);
Poco::JSON::Array Arr;
for(const auto &i:SubDevices) {
Arr.add(i.serialNumber);
}
Poco::JSON::Object RetObj;
RetObj.set("serialNumbers", Arr);
std::ostringstream SS;
Poco::JSON::Stringifier::stringify(RetObj, SS);
Answer = SS.str();
}
void WebSocketClient::Process(const Poco::JSON::Object::Ptr &O, std::string &Result, bool &Done ) {
try {
if (O->has("command") && O->has("id")) {
auto id = (uint64_t) O->get("id");
std::string Answer;
auto Command = O->get("command").toString();
if (Command == "serial_number_search" && O->has("serial_prefix")) {
ws_command_serial_number_search(O,Done,Answer);
} else if (WebSocketClientServer()->GeoCodeEnabled() && Command == "address_completion" && O->has("address")) {
ws_command_address_completion(O,Done,Answer);
} else if (WebSocketClientServer()->GeoCodeEnabled() && Command == "subuser_search" && O->has("operatorId")) {
ws_command_subuser_search(O,Done,Answer);
} else if (WebSocketClientServer()->GeoCodeEnabled() && Command == "subdevice_search" && O->has("operatorId") && O->has("serial_prefix")) {
ws_command_subdevice_search(O,Done,Answer);
} else if (Command=="exit") {
ws_command_exit(O,Done,Answer);
} else {
ws_command_invalid(O,Done,Answer);
}
Result = fmt::format("{{ \"command_response_id\" : {} , \"response\" : {} }}" , id, Answer);
}
} catch (const Poco::Exception &E) {
Logger().log(E);
}
}
std::string WebSocketClient::GoogleGeoCodeCall(const std::string &A) {
try {
std::string URI = { "https://maps.googleapis.com/maps/api/geocode/json"};
Poco::URI uri(URI);
uri.addQueryParameter("address",A);
uri.addQueryParameter("key", WebSocketClientServer()->GoogleApiKey());
Poco::Net::HTTPSClientSession session(uri.getHost(), uri.getPort());
Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_GET, uri.getPathAndQuery(), Poco::Net::HTTPMessage::HTTP_1_1);
session.sendRequest(req);
Poco::Net::HTTPResponse res;
std::istream& rs = session.receiveResponse(res);
if(res.getStatus()==Poco::Net::HTTPResponse::HTTP_OK) {
std::ostringstream os;
Poco::StreamCopier::copyStream(rs,os);
return os.str();
} else {
std::ostringstream os;
Poco::StreamCopier::copyStream(rs,os);
return R"lit({ "error: )lit" + os.str() + R"lit( })lit";
}
} catch(...) {
}
return "{ \"error\" : \"No call made\" }";
}
}

View File

@@ -1,183 +0,0 @@
//
// Created by stephane bourque on 2021-11-01.
//
#pragma once
#include "framework/MicroService.h"
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/WebSocket.h"
#include "Poco/Environment.h"
#include "Poco/NObserver.h"
#include "Poco/Net/SocketNotification.h"
#include "RESTObjects/RESTAPI_SecurityObjects.h"
#include "RESTObjects/RESTAPI_ProvObjects.h"
namespace OpenWifi {
class MyParallelSocketReactor {
public:
explicit MyParallelSocketReactor(uint32_t NumReactors=8) :
NumReactors_(NumReactors)
{
Reactors_ = new Poco::Net::SocketReactor[NumReactors_];
for(uint32_t i=0;i<NumReactors_;i++) {
ReactorPool_.start(Reactors_[i]);
}
}
~MyParallelSocketReactor() {
for(uint32_t i=0;i<NumReactors_;i++) {
Reactors_[i].stop();
}
ReactorPool_.stopAll();
ReactorPool_.joinAll();
delete [] Reactors_;
}
Poco::Net::SocketReactor & Reactor() {
return Reactors_[ rand() % NumReactors_ ];
}
private:
uint32_t NumReactors_;
Poco::Net::SocketReactor * Reactors_;
Poco::ThreadPool ReactorPool_;
};
class WebSocketClient;
class WebSocketClientServer : public SubSystemServer, Poco::Runnable {
public:
static auto instance() {
static auto instance_ = new WebSocketClientServer;
return instance_;
}
int Start() override;
void Stop() override;
void run() override;
inline MyParallelSocketReactor & ReactorPool() { return *ReactorPool_; }
inline bool Register( WebSocketClient * Client, const std::string &Id) {
std::lock_guard G(Mutex_);
Clients_[Id] = std::make_pair(Client,"");
return true;
}
inline void UnRegister(const std::string &Id) {
std::lock_guard G(Mutex_);
Clients_.erase(Id);
}
inline void SetUser(const std::string &Id, const std::string &UserId) {
std::lock_guard G(Mutex_);
auto it=Clients_.find(Id);
if(it!=Clients_.end()) {
Clients_[Id] = std::make_pair(it->second.first,UserId);
}
}
inline bool GeoCodeEnabled() const { return GeoCodeEnabled_; }
[[nodiscard]] inline std::string GoogleApiKey() const { return GoogleApiKey_; }
[[nodiscard]] bool Send(const std::string &Id, const std::string &Payload);
[[nodiscard]] bool SendUserNotification(const std::string &userName, const ProvObjects::WebSocketNotification & Notification);
private:
std::atomic_bool Running_=false;
Poco::Thread Thr_;
std::unique_ptr<MyParallelSocketReactor> ReactorPool_;
bool GeoCodeEnabled_=false;
std::string GoogleApiKey_;
std::map<std::string, std::pair<WebSocketClient *,std::string>> Clients_;
[[nodiscard]] bool SendToUser(const std::string &userName, const std::string &Payload);
WebSocketClientServer() noexcept:
SubSystemServer("WebSocketClientServer", "WSCLNT-SVR", "websocketclients")
{
}
};
inline auto WebSocketClientServer() { return WebSocketClientServer::instance(); }
class WebSocketClient {
public:
explicit WebSocketClient( Poco::Net::WebSocket & WS , const std::string &Id, Poco::Logger & L) :
Reactor_(WebSocketClientServer()->ReactorPool().Reactor()),
Id_(Id),
Logger_(L) {
try {
WS_ = std::make_unique<Poco::Net::WebSocket>(WS);
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ReadableNotification>(
*this, &WebSocketClient::OnSocketReadable));
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ShutdownNotification>(
*this, &WebSocketClient::OnSocketShutdown));
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ErrorNotification>(
*this, &WebSocketClient::OnSocketError));
WebSocketClientServer()->Register(this, Id_);
} catch (...) {
delete this;
}
}
~WebSocketClient() {
try {
WebSocketClientServer()->UnRegister(Id_);
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ReadableNotification>(*this,&WebSocketClient::OnSocketReadable));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ShutdownNotification>(*this,&WebSocketClient::OnSocketShutdown));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ErrorNotification>(*this,&WebSocketClient::OnSocketError));
(*WS_).shutdown();
(*WS_).close();
} catch(...) {
}
}
void ws_command_serial_number_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_address_completion( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_exit( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_invalid( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_subuser_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
void ws_command_subdevice_search( const Poco::JSON::Object::Ptr &O, bool &Done, std::string &Answer);
[[nodiscard]] inline const std::string & Id() { return Id_; };
std::string GoogleGeoCodeCall(const std::string &A);
[[nodiscard]] inline bool Send(const std::string &Payload) {
try {
WS_->sendFrame(Payload.c_str(),Payload.size());
return true;
} catch (...) {
}
return false;
}
Poco::Logger & Logger() { return Logger_;}
private:
std::unique_ptr<Poco::Net::WebSocket> WS_;
Poco::Net::SocketReactor & Reactor_;
std::string Id_;
Poco::Logger & Logger_;
bool Authenticated_=false;
SecurityObjects::UserInfoAndPolicy UserInfo_;
void Process(const Poco::JSON::Object::Ptr &O, std::string &Answer, bool &Done );
void OnSocketReadable(const Poco::AutoPtr<Poco::Net::ReadableNotification>& pNf);
void OnSocketShutdown(const Poco::AutoPtr<Poco::Net::ShutdownNotification>& pNf);
void OnSocketError(const Poco::AutoPtr<Poco::Net::ErrorNotification>& pNf);
};
}

View File

@@ -73,6 +73,11 @@ using namespace std::chrono_literals;
#include "Poco/SplitterChannel.h"
#include "Poco/JWT/Signer.h"
#include "Poco/DeflatingStream.h"
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/WebSocket.h"
#include "Poco/Environment.h"
#include "Poco/NObserver.h"
#include "Poco/Net/SocketNotification.h"
#include "cppkafka/cppkafka.h"
@@ -4560,6 +4565,449 @@ namespace OpenWifi {
}
inline MicroService * MicroService::instance_ = nullptr;
struct WebSocketNotificationContent {
std::string title,
type,
details;
std::vector<std::string> success,
error,
warning;
uint64_t timeStamp=std::time(nullptr);
void to_json(Poco::JSON::Object &Obj) const;
bool from_json(const Poco::JSON::Object::Ptr &Obj);
};
struct WebSocketNotification {
inline static uint64_t xid=1;
uint64_t notification_id=++xid;
WebSocketNotificationContent content;
void to_json(Poco::JSON::Object &Obj) const;
bool from_json(const Poco::JSON::Object::Ptr &Obj);
};
inline void WebSocketNotificationContent::to_json(Poco::JSON::Object &Obj) const {
RESTAPI_utils::field_to_json(Obj,"title",title);
RESTAPI_utils::field_to_json(Obj,"type",type);
RESTAPI_utils::field_to_json(Obj,"success",success);
RESTAPI_utils::field_to_json(Obj,"error",error);
RESTAPI_utils::field_to_json(Obj,"warning",warning);
RESTAPI_utils::field_to_json(Obj,"timeStamp",timeStamp);
RESTAPI_utils::field_to_json(Obj,"details",details);
}
inline bool WebSocketNotificationContent::from_json(const Poco::JSON::Object::Ptr &Obj) {
try {
RESTAPI_utils::field_from_json(Obj,"title",title);
RESTAPI_utils::field_from_json(Obj,"type",type);
RESTAPI_utils::field_from_json(Obj,"success",success);
RESTAPI_utils::field_from_json(Obj,"error",error);
RESTAPI_utils::field_from_json(Obj,"warning",warning);
RESTAPI_utils::field_from_json(Obj,"timeStamp",timeStamp);
RESTAPI_utils::field_from_json(Obj,"details",details);
return true;
} catch(...) {
}
return false;
}
inline void WebSocketNotification::to_json(Poco::JSON::Object &Obj) const {
RESTAPI_utils::field_to_json(Obj,"notification_id",notification_id);
RESTAPI_utils::field_to_json(Obj,"content",content);
}
inline bool WebSocketNotification::from_json(const Poco::JSON::Object::Ptr &Obj) {
try {
RESTAPI_utils::field_from_json(Obj,"notification_id",notification_id);
RESTAPI_utils::field_from_json(Obj,"content",content);
return true;
} catch(...) {
}
return false;
}
class WebSocketClientProcessor {
public:
virtual void Processor(const Poco::JSON::Object::Ptr &O, std::string &Answer, bool &Done ) = 0;
private:
};
class MyParallelSocketReactor {
public:
explicit MyParallelSocketReactor(uint32_t NumReactors = 8);
~MyParallelSocketReactor();
Poco::Net::SocketReactor &Reactor();
private:
uint32_t NumReactors_;
Poco::Net::SocketReactor *Reactors_;
Poco::ThreadPool ReactorPool_;
};
class WebSocketClient;
class WebSocketClientServer : public SubSystemServer, Poco::Runnable {
public:
static auto instance() {
static auto instance_ = new WebSocketClientServer;
return instance_;
}
int Start() override;
void Stop() override;
void run() override;
MyParallelSocketReactor &ReactorPool();
void NewClient(Poco::Net::WebSocket &WS, const std::string &Id);
bool Register(WebSocketClient *Client, const std::string &Id);
void SetProcessor(WebSocketClientProcessor *F);
void UnRegister(const std::string &Id);
void SetUser(const std::string &Id, const std::string &UserId);
[[nodiscard]] inline bool GeoCodeEnabled() const { return GeoCodeEnabled_; }
[[nodiscard]] inline std::string GoogleApiKey() const { return GoogleApiKey_; }
[[nodiscard]] bool Send(const std::string &Id, const std::string &Payload);
[[nodiscard]] bool
SendUserNotification(const std::string &userName, const WebSocketNotification &Notification);
[[nodiscard]] bool SendToUser(const std::string &userName, const std::string &Payload);
private:
std::atomic_bool Running_ = false;
Poco::Thread Thr_;
std::unique_ptr<MyParallelSocketReactor> ReactorPool_;
bool GeoCodeEnabled_ = false;
std::string GoogleApiKey_;
std::map<std::string, std::pair<WebSocketClient *, std::string>> Clients_;
WebSocketClientProcessor *Processor_ = nullptr;
WebSocketClientServer() noexcept;
};
inline auto WebSocketClientServer() { return WebSocketClientServer::instance(); }
class WebSocketClient {
public:
explicit WebSocketClient(Poco::Net::WebSocket &WS, const std::string &Id, Poco::Logger &L,
WebSocketClientProcessor *Processor);
virtual ~WebSocketClient();
[[nodiscard]] inline const std::string &Id();
[[nodiscard]] Poco::Logger &Logger();
[[nodiscard]] inline bool Send(const std::string &Payload);
private:
std::unique_ptr<Poco::Net::WebSocket> WS_;
Poco::Net::SocketReactor &Reactor_;
std::string Id_;
Poco::Logger &Logger_;
bool Authenticated_ = false;
SecurityObjects::UserInfoAndPolicy UserInfo_;
WebSocketClientProcessor *Processor_ = nullptr;
void OnSocketReadable(const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf);
void OnSocketShutdown(const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf);
void OnSocketError(const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf);
};
inline MyParallelSocketReactor::MyParallelSocketReactor(uint32_t NumReactors) :
NumReactors_(NumReactors)
{
Reactors_ = new Poco::Net::SocketReactor[NumReactors_];
for(uint32_t i=0;i<NumReactors_;i++) {
ReactorPool_.start(Reactors_[i]);
}
}
inline MyParallelSocketReactor::~MyParallelSocketReactor() {
for(uint32_t i=0;i<NumReactors_;i++) {
Reactors_[i].stop();
}
ReactorPool_.stopAll();
ReactorPool_.joinAll();
delete [] Reactors_;
}
inline Poco::Net::SocketReactor & MyParallelSocketReactor::Reactor() {
return Reactors_[ rand() % NumReactors_ ];
}
inline MyParallelSocketReactor & WebSocketClientServer::ReactorPool() { return *ReactorPool_; }
inline void WebSocketClientServer::NewClient(Poco::Net::WebSocket & WS, const std::string &Id) {
std::lock_guard G(Mutex_);
auto Client = new WebSocketClient(WS,Id,Logger(), Processor_);
Clients_[Id] = std::make_pair(Client,"");
}
inline bool WebSocketClientServer::Register( WebSocketClient * Client, const std::string &Id) {
std::lock_guard G(Mutex_);
Clients_[Id] = std::make_pair(Client,"");
return true;
}
inline void WebSocketClientServer::SetProcessor( WebSocketClientProcessor * F) {
Processor_ = F;
}
inline void WebSocketClientServer::UnRegister(const std::string &Id) {
std::lock_guard G(Mutex_);
Clients_.erase(Id);
}
inline void WebSocketClientServer::SetUser(const std::string &Id, const std::string &UserId) {
std::lock_guard G(Mutex_);
auto it=Clients_.find(Id);
if(it!=Clients_.end()) {
Clients_[Id] = std::make_pair(it->second.first,UserId);
}
}
[[nodiscard]] inline bool SendToUser(const std::string &userName, const std::string &Payload);
inline WebSocketClientServer::WebSocketClientServer() noexcept:
SubSystemServer("WebSocketClientServer", "WSCLNT-SVR", "websocketclients")
{
}
inline void WebSocketClientServer::run() {
Running_ = true ;
while(Running_) {
Poco::Thread::trySleep(2000);
if(!Running_)
break;
}
};
inline int WebSocketClientServer::Start() {
GoogleApiKey_ = MicroService::instance().ConfigGetString("google.apikey","");
GeoCodeEnabled_ = !GoogleApiKey_.empty();
ReactorPool_ = std::make_unique<MyParallelSocketReactor>();
Thr_.start(*this);
return 0;
};
inline void WebSocketClientServer::Stop() {
if(Running_) {
Running_ = false;
Thr_.wakeUp();
Thr_.join();
}
};
inline bool WebSocketClientServer::SendUserNotification(const std::string &userName,
const WebSocketNotification &Notification) {
Poco::JSON::Object Payload;
Notification.to_json(Payload);
Poco::JSON::Object Msg;
Msg.set("notification",Payload);
std::ostringstream OO;
Msg.stringify(OO);
// std::cout << std::endl << OO.str() << std::endl;
return SendToUser(userName,OO.str());
}
inline void WebSocketClient::OnSocketError([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf) {
delete this;
}
inline bool WebSocketClientServer::Send(const std::string &Id, const std::string &Payload) {
std::lock_guard G(Mutex_);
auto It = Clients_.find(Id);
if(It!=Clients_.end())
return It->second.first->Send(Payload);
return false;
}
inline bool WebSocketClientServer::SendToUser(const std::string &UserName, const std::string &Payload) {
std::lock_guard G(Mutex_);
uint64_t Sent=0;
for(const auto &client:Clients_) {
if(client.second.second == UserName) {
if(client.second.first->Send(Payload))
Sent++;
}
}
return Sent>0;
}
inline void WebSocketClient::OnSocketReadable([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf) {
int flags;
int n;
bool Done=false;
Poco::Buffer<char> IncomingFrame(0);
n = WS_->receiveFrame(IncomingFrame, flags);
auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
if(n==0) {
return delete this;
}
switch(Op) {
case Poco::Net::WebSocket::FRAME_OP_PING: {
WS_->sendFrame("", 0,
(int)Poco::Net::WebSocket::FRAME_OP_PONG |
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
}
break;
case Poco::Net::WebSocket::FRAME_OP_PONG: {
}
break;
case Poco::Net::WebSocket::FRAME_OP_CLOSE: {
Logger().warning(Poco::format("CLOSE(%s): Client is closing its connection.",Id_));
Done=true;
}
break;
case Poco::Net::WebSocket::FRAME_OP_TEXT: {
IncomingFrame.append(0);
if(!Authenticated_) {
std::string Frame{IncomingFrame.begin()};
auto Tokens = Utils::Split(Frame,':');
bool Expired = false, Contacted = false;
if(Tokens.size()==2 && AuthClient()->IsAuthorized(Tokens[1], UserInfo_, Expired, Contacted)) {
Authenticated_=true;
std::string S{"Welcome! Bienvenue! Bienvenidos!"};
WS_->sendFrame(S.c_str(),S.size());
WebSocketClientServer()->SetUser(Id_,UserInfo_.userinfo.email);
} else {
std::string S{"Invalid token. Closing connection."};
WS_->sendFrame(S.c_str(),S.size());
Done=true;
}
} else {
try {
Poco::JSON::Parser P;
auto Obj = P.parse(IncomingFrame.begin())
.extract<Poco::JSON::Object::Ptr>();
std::string Answer;
if(Processor_!= nullptr)
Processor_->Processor(Obj, Answer, Done);
if (!Answer.empty())
WS_->sendFrame(Answer.c_str(), (int) Answer.size());
else {
WS_->sendFrame("{}", 2);
}
} catch (const Poco::JSON::JSONException & E) {
Logger().log(E);
}
}
}
break;
default:
{
}
}
if(Done) {
delete this;
}
}
inline void WebSocketClient::OnSocketShutdown([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf) {
delete this;
}
inline WebSocketClient::WebSocketClient( Poco::Net::WebSocket & WS , const std::string &Id, Poco::Logger & L, WebSocketClientProcessor * Processor) :
Reactor_(WebSocketClientServer()->ReactorPool().Reactor()),
Id_(Id),
Logger_(L),
Processor_(Processor) {
try {
WS_ = std::make_unique<Poco::Net::WebSocket>(WS);
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ReadableNotification>(
*this, &WebSocketClient::OnSocketReadable));
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ShutdownNotification>(
*this, &WebSocketClient::OnSocketShutdown));
Reactor_.addEventHandler(*WS_,
Poco::NObserver<WebSocketClient, Poco::Net::ErrorNotification>(
*this, &WebSocketClient::OnSocketError));
// WebSocketClientServer()->Register(this, Id_);
} catch (...) {
delete this;
}
}
inline WebSocketClient::~WebSocketClient() {
try {
WebSocketClientServer()->UnRegister(Id_);
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ReadableNotification>(*this,&WebSocketClient::OnSocketReadable));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ShutdownNotification>(*this,&WebSocketClient::OnSocketShutdown));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<WebSocketClient,
Poco::Net::ErrorNotification>(*this,&WebSocketClient::OnSocketError));
(*WS_).shutdown();
(*WS_).close();
WebSocketClientServer()->UnRegister(Id_);
} catch(...) {
}
}
[[nodiscard]] inline const std::string & WebSocketClient::Id() {
return Id_;
};
[[nodiscard]] inline Poco::Logger & WebSocketClient::Logger() {
return Logger_;
}
[[nodiscard]] inline bool WebSocketClient::Send(const std::string &Payload) {
try {
WS_->sendFrame(Payload.c_str(),Payload.size());
return true;
} catch (...) {
}
return false;
}
class RESTAPI_webSocketServer : public RESTAPIHandler {
public:
inline RESTAPI_webSocketServer(const RESTAPIHandler::BindingMap &bindings, Poco::Logger &L, RESTAPI_GenericServer &Server, uint64_t TransactionId, bool Internal)
: RESTAPIHandler(bindings, L,
std::vector<std::string>{ Poco::Net::HTTPRequest::HTTP_GET,
Poco::Net::HTTPRequest::HTTP_OPTIONS},
Server, TransactionId, Internal,false) {}
static auto PathName() { return std::list<std::string>{"/api/v1/ws"};}
void DoGet() final;
void DoDelete() final {};
void DoPost() final {};
void DoPut() final {};
private:
};
inline void RESTAPI_webSocketServer::DoGet() {
try
{
if(Request->find("Upgrade") != Request->end() && Poco::icompare((*Request)["Upgrade"], "websocket") == 0) {
try
{
Poco::Net::WebSocket WS(*Request, *Response);
Logger().information("WebSocket connection established.");
auto Id = MicroService::CreateUUID();
WebSocketClientServer()->NewClient(WS,Id);
}
catch (...) {
std::cout << "Cannot create websocket client..." << std::endl;
}
}
} catch(...) {
std::cout << "Cannot upgrade connection..." << std::endl;
}
}
}
namespace OpenWifi::Utils {