stephb9959
2022-09-20 15:11:14 -07:00
parent 05767cc1a7
commit d77a4a6bb9
4 changed files with 84 additions and 39 deletions

View File

@@ -94,7 +94,7 @@ namespace OpenWifi {
if (!SS->secure()) {
poco_trace(Logger_,fmt::format("CONNECTION({}): Connection is NOT secure. Device is not allowed.", CId_));
return delete this;
return EndConnection();
} else {
poco_debug(Logger_,fmt::format("CONNECTION({}): Connection is secure.", CId_));
}
@@ -109,23 +109,23 @@ namespace OpenWifi {
} else {
State_.VerifiedCertificate = GWObjects::NO_CERTIFICATE;
poco_trace(Logger_,fmt::format("CONNECTION({}): Device certificate is not valid. Device is not allowed.", CId_));
return delete this;
return EndConnection();
}
} catch (const Poco::Exception &E) {
LogException(E);
poco_error(Logger_,fmt::format("CONNECTION({}): Device certificate is not valid. Device is not allowed.", CId_));
return delete this;
return EndConnection();
}
} else {
State_.VerifiedCertificate = GWObjects::NO_CERTIFICATE;
poco_error(Logger_,fmt::format("CONNECTION({}): No certificates available..", CId_));
return delete this;
return EndConnection();
}
if (AP_WS_Server::IsSim(CN_) && !AP_WS_Server()->IsSimEnabled()) {
poco_warning(Logger_,fmt::format(
"CONNECTION({}): Sim Device {} is not allowed. Disconnecting.", CId_, CN_));
return delete this;
return EndConnection();
}
SerialNumber_ = CN_;
@@ -134,7 +134,7 @@ namespace OpenWifi {
if (!CN_.empty() && StorageService()->IsBlackListed(SerialNumber_)) {
poco_warning(Logger_,fmt::format("CONNECTION({}): Device {} is black listed. Disconnecting.",
CId_, CN_));
return delete this;
return EndConnection();
}
WS_->setMaxPayloadSize(BufSize);
@@ -187,7 +187,7 @@ namespace OpenWifi {
poco_error(Logger_,fmt::format("CONNECTION({}): Exception caught during device connection. Device will have to retry. Unsecure connect denied.",
CId_));
}
return delete this;
return EndConnection();
}
static void NotifyKafkaDisconnect(const std::string & SerialNumber) {
@@ -206,7 +206,8 @@ namespace OpenWifi {
}
AP_WS_Connection::~AP_WS_Connection() {
std::cout << "Garbage collection Session: " << State_.sessionId << std::endl;
/*
poco_information(Logger_,fmt::format("CONNECTION-CLOSING({}): {}.", CId_, SerialNumber_));
auto SessionDeleted = DeviceRegistry()->EndSession(State_.sessionId, this, SerialNumberInt_);
@@ -232,6 +233,37 @@ namespace OpenWifi {
}
if(SessionDeleted)
WebSocketClientNotificationDeviceDisconnected(SerialNumber_);
*/
}
void AP_WS_Connection::EndConnection() {
poco_information(Logger_,fmt::format("CONNECTION-CLOSING({}): {}.", CId_, SerialNumber_));
auto SessionDeleted = DeviceRegistry()->EndSession(State_.sessionId, this, SerialNumberInt_);
if (Registered_ && WS_) {
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<AP_WS_Connection, Poco::Net::ReadableNotification>(
*this, &AP_WS_Connection::OnSocketReadable));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<AP_WS_Connection, Poco::Net::ShutdownNotification>(
*this, &AP_WS_Connection::OnSocketShutdown));
Reactor_.removeEventHandler(*WS_,
Poco::NObserver<AP_WS_Connection, Poco::Net::ErrorNotification>(
*this, &AP_WS_Connection::OnSocketError));
(*WS_).close();
} else if (WS_) {
(*WS_).close();
}
if (KafkaManager()->Enabled() && !SerialNumber_.empty()) {
std::string s(SerialNumber_);
std::thread t([s]() { NotifyKafkaDisconnect(s); });
t.detach();
}
if(SessionDeleted)
WebSocketClientNotificationDeviceDisconnected(SerialNumber_);
AP_WS_Server()->DeleteConnection(State_.sessionId);
}
bool AP_WS_Connection::LookForUpgrade(const uint64_t UUID, uint64_t & UpgradedUUID) {
@@ -530,33 +562,33 @@ namespace OpenWifi {
void AP_WS_Connection::OnSocketShutdown([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf) {
std::lock_guard Guard(Mutex_);
poco_trace(Logger_, fmt::format("SOCKET-SHUTDOWN({}): Closing.", CId_));
delete this;
return EndConnection();
}
void AP_WS_Connection::OnSocketError([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf) {
std::lock_guard Guard(Mutex_);
poco_trace(Logger_, fmt::format("SOCKET-ERROR({}): Closing.", CId_));
delete this;
return EndConnection();
}
void AP_WS_Connection::OnSocketReadable([[maybe_unused]] const Poco::AutoPtr<Poco::Net::ReadableNotification> &pNf) {
std::lock_guard Guard(Mutex_);
if(!AP_WS_Server()->Running())
return delete this;
return EndConnection();
try {
return ProcessIncomingFrame();
} catch (const Poco::Exception &E) {
Logger_.log(E);
return delete this;
return EndConnection();
} catch (const std::exception &E) {
std::string W = E.what();
poco_information(Logger_, fmt::format("std::exception caught: {}. Connection terminated with {}", W, CId_));
return delete this;
return EndConnection();
} catch (...) {
poco_information(Logger_, fmt::format("Unknown exception for {}. Connection terminated.", CId_));
return delete this;
return EndConnection();
}
}
@@ -570,7 +602,7 @@ namespace OpenWifi {
if (IncomingSize == 0 && flags == 0 && Op == 0) {
poco_information(Logger_, fmt::format("DISCONNECT({}): device has disconnected. Session={}", CId_, State_.sessionId));
return delete this;
return EndConnection();
}
IncomingFrame.append(0);
@@ -647,7 +679,7 @@ namespace OpenWifi {
case Poco::Net::WebSocket::FRAME_OP_CLOSE: {
poco_information(Logger_,
fmt::format("CLOSE({}): Device is closing its connection.", CId_));
return delete this;
return EndConnection();
} break;
default: {
@@ -661,7 +693,7 @@ namespace OpenWifi {
E.displayText(),
IncomingFrame.begin()==nullptr ? "" : IncomingFrame.begin(),
State_.sessionId));
return delete this;
return EndConnection();
} catch (const Poco::JSON::JSONException &E) {
poco_warning(Logger_, fmt::format("JSONException({}): Text:{} Payload:{} Session:{}",
CId_,
@@ -674,59 +706,59 @@ namespace OpenWifi {
E.displayText(),
IncomingFrame.begin()==nullptr ? "" : IncomingFrame.begin(),
State_.sessionId));
return delete this;
return EndConnection();
} catch (const Poco::Net::SSLConnectionUnexpectedlyClosedException &E) {
poco_warning(Logger_, fmt::format("SSLConnectionUnexpectedlyClosedException({}): Text:{} Payload:{} Session:{}",
CId_,
E.displayText(),
IncomingFrame.begin()==nullptr ? "" : IncomingFrame.begin(),
State_.sessionId));
return delete this;
return EndConnection();
} catch (const Poco::Net::SSLException &E) {
poco_warning(Logger_, fmt::format("SSLException({}): Text:{} Payload:{} Session:{}",
CId_,
E.displayText(),
IncomingFrame.begin()==nullptr ? "" : IncomingFrame.begin(),
State_.sessionId));
return delete this;
return EndConnection();
} catch (const Poco::Net::NetException &E) {
poco_warning(Logger_, fmt::format("NetException({}): Text:{} Payload:{} Session:{}",
CId_,
E.displayText(),
IncomingFrame.begin()==nullptr ? "" : IncomingFrame.begin(),
State_.sessionId));
return delete this;
return EndConnection();
} catch (const Poco::IOException &E) {
poco_warning(Logger_, fmt::format("IOException({}): Text:{} Payload:{} Session:{}",
CId_,
E.displayText(),
IncomingFrame.begin()==nullptr ? "" : IncomingFrame.begin(),
State_.sessionId));
return delete this;
return EndConnection();
} catch (const Poco::Exception &E) {
poco_warning(Logger_, fmt::format("Exception({}): Text:{} Payload:{} Session:{}",
CId_,
E.displayText(),
IncomingFrame.begin()==nullptr ? "" : IncomingFrame.begin(),
State_.sessionId));
return delete this;
return EndConnection();
} catch (const std::exception &E) {
poco_warning(Logger_, fmt::format("std::exception({}): Text:{} Payload:{} Session:{}",
CId_,
E.what(),
IncomingFrame.begin()==nullptr ? "" : IncomingFrame.begin(),
State_.sessionId));
return delete this;
return EndConnection();
} catch (...) {
poco_error(Logger_,fmt::format("UnknownException({}): Device must be disconnected. Unknown exception. Session:{}", CId_, State_.sessionId));
return delete this;
return EndConnection();
}
if (Errors_ < 10)
return;
poco_warning(Logger_, fmt::format("DISCONNECTING({}): Too many errors", CId_));
delete this;
return EndConnection();
}
bool AP_WS_Connection::Send(const std::string &Payload) {

View File

@@ -29,6 +29,7 @@ namespace OpenWifi {
Poco::Net::HTTPServerResponse &response, std::uint64_t connection_id);
~AP_WS_Connection();
void EndConnection();
void ProcessJSONRPCEvent(Poco::JSON::Object::Ptr & Doc);
void ProcessJSONRPCResult(Poco::JSON::Object::Ptr Doc);
void ProcessIncomingFrame();

View File

@@ -19,7 +19,7 @@ namespace OpenWifi {
void AP_WS_RequestHandler::handleRequest(Poco::Net::HTTPServerRequest &request,
Poco::Net::HTTPServerResponse &response) {
try {
new AP_WS_Connection(request,response,id_);
AP_WS_Server()->AddConnection(id_,std::make_shared<AP_WS_Connection>(request,response,id_));
} catch (...) {
Logger_.warning("Exception during WS creation");
}

View File

@@ -91,18 +91,30 @@ namespace OpenWifi {
[[nodiscard]] inline Poco::Net::SocketReactor & NextReactor() { return Reactor_pool_->NextReactor(); }
[[nodiscard]] inline bool Running() const { return Running_; }
private:
std::unique_ptr<Poco::Crypto::X509Certificate> IssuerCert_;
std::list<std::unique_ptr<Poco::Net::HTTPServer>> WebServers_;
Poco::Net::SocketReactor Reactor_;
Poco::Thread ReactorThread_;
std::string SimulatorId_;
Poco::ThreadPool DeviceConnectionPool_{"ws:dev-pool", 2, 32};
bool LookAtProvisioning_ = false;
bool UseDefaultConfig_ = true;
bool SimulatorEnabled_=false;
std::unique_ptr<AP_WS_ReactorThreadPool> Reactor_pool_;
std::atomic_bool Running_=false;
inline void AddConnection(std::uint64_t session_id, std::shared_ptr<AP_WS_Connection> Connection ) {
std::unique_lock Lock(LocalMutex_);
Connections_[session_id] = Connection;
}
inline void DeleteConnection(std::uint64_t session_id) {
std::unique_lock Lock(LocalMutex_);
Connections_.erase(session_id);
}
private:
std::shared_mutex LocalMutex_;
std::unique_ptr<Poco::Crypto::X509Certificate> IssuerCert_;
std::list<std::unique_ptr<Poco::Net::HTTPServer>> WebServers_;
Poco::Net::SocketReactor Reactor_;
Poco::Thread ReactorThread_;
std::string SimulatorId_;
Poco::ThreadPool DeviceConnectionPool_{"ws:dev-pool", 2, 32};
bool LookAtProvisioning_ = false;
bool UseDefaultConfig_ = true;
bool SimulatorEnabled_=false;
std::unique_ptr<AP_WS_ReactorThreadPool> Reactor_pool_;
std::atomic_bool Running_=false;
std::map<std::uint64_t, std::shared_ptr<AP_WS_Connection>> Connections_;
AP_WS_Server() noexcept:
SubSystemServer("WebSocketServer", "WS-SVR", "ucentral.websocket") {