diff --git a/Dockerfile b/Dockerfile index b24fd296..93a6604b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ ARG DEBIAN_VERSION=bookworm -ARG POCO_VERSION=poco-tip-v3-tag +ARG POCO_VERSION=poco-tip-v4-tag ARG CPPKAFKA_VERSION=tip-v1 ARG VALIJASON_VERSION=tip-v1.0.2 ARG APP_NAME=owgw diff --git a/src/AP_WS_Connection.cpp b/src/AP_WS_Connection.cpp index f6473fff..257c3055 100644 --- a/src/AP_WS_Connection.cpp +++ b/src/AP_WS_Connection.cpp @@ -40,7 +40,7 @@ namespace OpenWifi { Poco::Net::HTTPServerResponse &response, uint64_t session_id, Poco::Logger &L, std::pair, std::shared_ptr> R) - : Logger_(L) { + : Logger_(L), IncomingFrame_(0) { Reactor_ = R.first; DbSession_ = R.second; @@ -55,6 +55,7 @@ namespace OpenWifi { WS_->setNoDelay(false); WS_->setKeepAlive(true); WS_->setBlocking(false); + IncomingFrame_.resize(0); uuid_ = MicroServiceRandom(std::numeric_limits::max()-1); AP_WS_Server()->IncrementConnectionCount(); @@ -601,36 +602,89 @@ namespace OpenWifi { EndConnection(); } - void AP_WS_Connection::ProcessIncomingFrame() { - Poco::Buffer IncomingFrame(0); + void AP_WS_Connection::ProcessWSFinalPayload() { + auto IncomingSize = IncomingFrame_.size(); + + IncomingFrame_.append(0); + + poco_trace(Logger_, + fmt::format("ProcessWSFrame({}): Final Frame received (len={}, Msg={}", + CId_, IncomingSize, IncomingFrame_.begin())); + + Poco::JSON::Parser parser; + auto ParsedMessage = parser.parse(IncomingFrame_.begin()); + auto IncomingJSON = ParsedMessage.extract(); + + if (IncomingJSON->has(uCentralProtocol::JSONRPC)) { + if (IncomingJSON->has(uCentralProtocol::METHOD) && + IncomingJSON->has(uCentralProtocol::PARAMS)) { + ProcessJSONRPCEvent(IncomingJSON); + } else if (IncomingJSON->has(uCentralProtocol::RESULT) && + IncomingJSON->has(uCentralProtocol::ID)) { + poco_trace(Logger_, fmt::format("RPC-RESULT({}): payload: {}", CId_, + IncomingFrame_.begin())); + ProcessJSONRPCResult(IncomingJSON); + } else { + poco_warning( + Logger_, + fmt::format("INVALID-PAYLOAD({}): Payload is not JSON-RPC 2.0: {}", + CId_, IncomingFrame_.begin())); + } + } else if (IncomingJSON->has(uCentralProtocol::RADIUS)) { + ProcessIncomingRadiusData(IncomingJSON); + } else { + std::ostringstream iS; + IncomingJSON->stringify(iS); + poco_warning( + Logger_, + fmt::format("FRAME({}): illegal transaction header, missing 'jsonrpc': {}", + CId_, iS.str())); + Errors_++; + } + IncomingFrame_.clear(); + IncomingFrame_.resize(0); + } + + void AP_WS_Connection::ProcessIncomingFrame() { + Poco::Buffer CurrentFrame(0); + bool KillConnection = false; + int flags = 0; + int IncomingSize = 0; - bool KillConnection=false; try { - int Op, flags; - auto IncomingSize = WS_->receiveFrame(IncomingFrame, flags); + IncomingSize = WS_->receiveFrame(CurrentFrame, flags); + int Op; Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK; - if (IncomingSize == 0 && flags == 0 && Op == 0) { + if (IncomingSize < 0 && flags == 0) { + poco_trace(Logger_, + fmt::format("EMPTY({}): Non-blocking try-again empty frame (len={}, flags={})", + CId_, IncomingSize, flags)); + } else if (IncomingSize == 0 && flags == 0) { poco_information(Logger_, fmt::format("DISCONNECT({}): device has disconnected. Session={}", CId_, State_.sessionId)); return EndConnection(); } - IncomingFrame.append(0); - - State_.RX += IncomingSize; - AP_WS_Server()->AddRX(IncomingSize); + if (IncomingSize > 0) { + State_.RX += IncomingSize; + AP_WS_Server()->AddRX(IncomingSize); + IncomingFrame_.append(CurrentFrame); + } State_.MessageCount++; State_.LastContact = Utils::Now(); + poco_trace(Logger_, + fmt::format("FRAME({}): Frame {} rx (len={}, flags={}, acc.len={})", + CId_, Op, IncomingSize, flags, IncomingFrame_.size())); switch (Op) { case Poco::Net::WebSocket::FRAME_OP_PING: { - poco_trace(Logger_, fmt::format("WS-PING({}): received. PONG sent back.", CId_)); + poco_trace(Logger_, fmt::format("PING({}): received. PONG sent back.", CId_)); WS_->sendFrame("", 0, (int)Poco::Net::WebSocket::FRAME_OP_PONG | - (int)Poco::Net::WebSocket::FRAME_FLAG_FIN); + (int)Poco::Net::WebSocket::FRAME_FLAG_FIN); if (KafkaManager()->Enabled()) { Poco::JSON::Object PingObject; @@ -646,47 +700,28 @@ namespace OpenWifi { poco_trace(Logger_,fmt::format("Sending PING for {}", SerialNumber_)); KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_,PingObject); } + return; } break; case Poco::Net::WebSocket::FRAME_OP_PONG: { poco_trace(Logger_, fmt::format("PONG({}): received and ignored.", CId_)); + return; + } break; + + case Poco::Net::WebSocket::FRAME_OP_CONT: { + poco_trace(Logger_, fmt::format("CONTINUATION({}): registered.", CId_)); + } break; + + case Poco::Net::WebSocket::FRAME_OP_BINARY: { + poco_trace(Logger_, fmt::format("BINARY({}): Invalid frame type.", CId_)); + KillConnection=true; + return; } break; case Poco::Net::WebSocket::FRAME_OP_TEXT: { poco_trace(Logger_, - fmt::format("FRAME({}): Frame received (length={}, flags={}). Msg={}", - CId_, IncomingSize, flags, IncomingFrame.begin())); - - Poco::JSON::Parser parser; - auto ParsedMessage = parser.parse(IncomingFrame.begin()); - auto IncomingJSON = ParsedMessage.extract(); - - if (IncomingJSON->has(uCentralProtocol::JSONRPC)) { - if (IncomingJSON->has(uCentralProtocol::METHOD) && - IncomingJSON->has(uCentralProtocol::PARAMS)) { - ProcessJSONRPCEvent(IncomingJSON); - } else if (IncomingJSON->has(uCentralProtocol::RESULT) && - IncomingJSON->has(uCentralProtocol::ID)) { - poco_trace(Logger_, fmt::format("RPC-RESULT({}): payload: {}", CId_, - IncomingFrame.begin())); - ProcessJSONRPCResult(IncomingJSON); - } else { - poco_warning( - Logger_, - fmt::format("INVALID-PAYLOAD({}): Payload is not JSON-RPC 2.0: {}", - CId_, IncomingFrame.begin())); - } - } else if (IncomingJSON->has(uCentralProtocol::RADIUS)) { - ProcessIncomingRadiusData(IncomingJSON); - } else { - std::ostringstream iS; - IncomingJSON->stringify(iS); - poco_warning( - Logger_, - fmt::format("FRAME({}): illegal transaction header, missing 'jsonrpc': {}", - CId_, iS.str())); - Errors_++; - } + fmt::format("TEXT({}): Frame received (len={}, flags={}). Msg={}", + CId_, IncomingSize, flags, CurrentFrame.begin())); } break; case Poco::Net::WebSocket::FRAME_OP_CLOSE: { @@ -702,25 +737,31 @@ namespace OpenWifi { return; } } + + // Check for final frame and process accumlated payload + if (!KillConnection && (flags & Poco::Net::WebSocket::FRAME_FLAG_FIN) != 0) { + ProcessWSFinalPayload(); + } + } catch (const Poco::Net::ConnectionResetException &E) { poco_warning(Logger_, fmt::format("ConnectionResetException({}): Text:{} Payload:{} Session:{}", CId_, E.displayText(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (const Poco::JSON::JSONException &E) { poco_warning(Logger_, fmt::format("JSONException({}): Text:{} Payload:{} Session:{}", CId_, E.displayText(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (const Poco::Net::WebSocketException &E) { poco_warning(Logger_, fmt::format("WebSocketException({}): Text:{} Payload:{} Session:{}", CId_, E.displayText(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (const Poco::Net::SSLConnectionUnexpectedlyClosedException &E) { @@ -729,42 +770,42 @@ namespace OpenWifi { fmt::format( "SSLConnectionUnexpectedlyClosedException({}): Text:{} Payload:{} Session:{}", CId_, E.displayText(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (const Poco::Net::SSLException &E) { poco_warning(Logger_, fmt::format("SSLException({}): Text:{} Payload:{} Session:{}", CId_, E.displayText(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (const Poco::Net::NetException &E) { poco_warning(Logger_, fmt::format("NetException({}): Text:{} Payload:{} Session:{}", CId_, E.displayText(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (const Poco::IOException &E) { poco_warning(Logger_, fmt::format("IOException({}): Text:{} Payload:{} Session:{}", CId_, E.displayText(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (const Poco::Exception &E) { poco_warning(Logger_, fmt::format("Exception({}): Text:{} Payload:{} Session:{}", CId_, E.displayText(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (const std::exception &E) { poco_warning(Logger_, fmt::format("std::exception({}): Text:{} Payload:{} Session:{}", CId_, E.what(), - IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(), + CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(), State_.sessionId)); KillConnection=true; } catch (...) { @@ -777,7 +818,9 @@ namespace OpenWifi { if (!KillConnection && Errors_ < 10) return; - poco_warning(Logger_, fmt::format("DISCONNECTING({}): ConnectionException: {} Errors: {}", CId_, KillConnection, Errors_ )); + poco_warning(Logger_, + fmt::format("DISCONNECTING({}): ConnectionException: {} Errors: {}", + CId_, KillConnection, Errors_ )); EndConnection(); } diff --git a/src/AP_WS_Connection.h b/src/AP_WS_Connection.h index a60c3e78..f3732242 100644 --- a/src/AP_WS_Connection.h +++ b/src/AP_WS_Connection.h @@ -34,6 +34,7 @@ namespace OpenWifi { void EndConnection(); void ProcessJSONRPCEvent(Poco::JSON::Object::Ptr &Doc); void ProcessJSONRPCResult(Poco::JSON::Object::Ptr Doc); + void ProcessWSFinalPayload(); void ProcessIncomingFrame(); void ProcessIncomingRadiusData(const Poco::JSON::Object::Ptr &Doc); @@ -146,6 +147,7 @@ namespace OpenWifi { std::uint64_t uuid_=0; bool Simulated_=false; std::atomic_uint64_t LastContact_=0; + Poco::Buffer IncomingFrame_; static inline std::atomic_uint64_t ConcurrentStartingDevices_ = 0; diff --git a/src/TelemetryClient.cpp b/src/TelemetryClient.cpp index 3fc92f49..aea4c323 100644 --- a/src/TelemetryClient.cpp +++ b/src/TelemetryClient.cpp @@ -120,14 +120,16 @@ namespace OpenWifi { Poco::Buffer IncomingFrame(0); try { - int Op, flags; - int IncomingSize; + int Op, flags, IncomingSize; + IncomingSize = WS_->receiveFrame(IncomingFrame, flags); Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK; - if (IncomingSize == 0 && flags == 0 && Op == 0) { - poco_information( - Logger(), + if (IncomingSize == -1) { + poco_trace(Logger(), + fmt::format("TELEMETRY-EMPTY({}): Empty frame, non-blocking try-again.", CId_)); + } else if (IncomingSize == 0 && flags == 0 && Op == 0) { + poco_information(Logger(), fmt::format("TELEMETRY-DISCONNECT({}): device has disconnected.", CId_)); MustDisconnect = true; } else { @@ -138,10 +140,12 @@ namespace OpenWifi { (int)Poco::Net::WebSocket::FRAME_OP_PONG | (int)Poco::Net::WebSocket::FRAME_FLAG_FIN); } else if (Op == Poco::Net::WebSocket::FRAME_OP_CLOSE) { - poco_information( - Logger(), + poco_information(Logger(), fmt::format("TELEMETRY-DISCONNECT({}): device wants to disconnect.", CId_)); MustDisconnect = true; + } else if (Op == Poco::Net::WebSocket::FRAME_OP_CONT) { + poco_information(Logger(), + fmt::format("TELEMETRY-CONT({}): rx {} bytes.", CId_, IncomingSize)); } } } catch (...) { @@ -154,4 +158,4 @@ namespace OpenWifi { SendTelemetryShutdown(); } -} // namespace OpenWifi \ No newline at end of file +} // namespace OpenWifi diff --git a/src/framework/MicroService.cpp b/src/framework/MicroService.cpp index 71a187ec..05a68995 100644 --- a/src/framework/MicroService.cpp +++ b/src/framework/MicroService.cpp @@ -295,6 +295,7 @@ namespace OpenWifi { auto Level = Poco::Logger::parseLevel( MicroService::instance().ConfigGetString("logging.level", "debug")); + //Level = Poco::Logger::parseLevel("trace"); // TODO: remove this Poco::Logger::root().setLevel(Level); if (!DisableWebSocketLogging) { static const UI_WebSocketClientServer::NotificationTypeIdVec Notifications = { @@ -845,4 +846,4 @@ namespace OpenWifi { } } -} // namespace OpenWifi \ No newline at end of file +} // namespace OpenWifi diff --git a/src/framework/SubSystemServer.cpp b/src/framework/SubSystemServer.cpp index 5be9c9d3..a543d1b4 100644 --- a/src/framework/SubSystemServer.cpp +++ b/src/framework/SubSystemServer.cpp @@ -343,4 +343,4 @@ namespace OpenWifi { } } -} // namespace OpenWifi \ No newline at end of file +} // namespace OpenWifi diff --git a/src/framework/UI_WebSocketClientServer.cpp b/src/framework/UI_WebSocketClientServer.cpp index eb5b0b7d..b7b5211d 100644 --- a/src/framework/UI_WebSocketClientServer.cpp +++ b/src/framework/UI_WebSocketClientServer.cpp @@ -210,10 +210,16 @@ namespace OpenWifi { n = Client->second->WS_->receiveFrame(IncomingFrame, flags); auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK; + if (n == -1) { + poco_warning(Logger(), + fmt::format("UI-EMPTY({}): {} Empty Frame flags {}.", + Client->second->Id_, Client->second->UserName_, flags)); + return; + } if (n == 0) { poco_debug(Logger(), - fmt::format("CLOSE({}): {} UI Client is closing WS connection.", - Client->second->Id_, Client->second->UserName_)); + fmt::format("CLOSE({}): {} UI Client is closing WS connection.", + Client->second->Id_, Client->second->UserName_)); return EndConnection(Client); } @@ -221,7 +227,7 @@ namespace OpenWifi { case Poco::Net::WebSocket::FRAME_OP_PING: { Client->second->WS_->sendFrame("", 0, (int)Poco::Net::WebSocket::FRAME_OP_PONG | - (int)Poco::Net::WebSocket::FRAME_FLAG_FIN); + (int)Poco::Net::WebSocket::FRAME_FLAG_FIN); } break; case Poco::Net::WebSocket::FRAME_OP_PONG: { } break; @@ -231,6 +237,11 @@ namespace OpenWifi { Client->second->Id_, Client->second->UserName_)); return EndConnection(Client); } break; + case Poco::Net::WebSocket::FRAME_OP_CONT: { + poco_warning(Logger(), + fmt::format("CONT({}): {} Unexpected CONT Frame - Ignoring.", + Client->second->Id_, Client->second->UserName_)); + } break; case Poco::Net::WebSocket::FRAME_OP_TEXT: { constexpr const char *DropMessagesCommand = "drop-notifications"; IncomingFrame.append(0); @@ -319,4 +330,4 @@ namespace OpenWifi { } } -} // namespace OpenWifi \ No newline at end of file +} // namespace OpenWifi diff --git a/src/rttys/RTTYS_server.cpp b/src/rttys/RTTYS_server.cpp index 31f89ba5..87677143 100644 --- a/src/rttys/RTTYS_server.cpp +++ b/src/rttys/RTTYS_server.cpp @@ -580,14 +580,16 @@ namespace OpenWifi { try { Client = Clients_.find(pNf->socket().impl()->sockfd()); if (Client == end(Clients_)) { - poco_warning(Logger(), fmt::format("Cannot find client socket: {}", - pNf->socket().impl()->sockfd())); + poco_warning(Logger(), + fmt::format("Cannot find client socket: {}", + pNf->socket().impl()->sockfd())); return; } Connection = Client->second; if(Connection->WSSocket_==nullptr || Connection->WSSocket_->impl()==nullptr) { - poco_warning(Logger(), fmt::format("WebSocket is no valid: {}", - Connection->SerialNumber_)); + poco_warning(Logger(), + fmt::format("WebSocket is not valid: {}", + Connection->SerialNumber_)); return; } @@ -596,15 +598,25 @@ namespace OpenWifi { auto ReceivedBytes = Connection->WSSocket_->receiveFrame(FrameBuffer, sizeof(FrameBuffer), flags); auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK; + + if (ReceivedBytes == -1) { + poco_trace(Logger(), + fmt::format("WS-EMPTY{}: Non-blocking try-again empty Frame: flags {}", + Connection->SerialNumber_, flags)); + return; + } + switch (Op) { case Poco::Net::WebSocket::FRAME_OP_PING: { Connection->WSSocket_->sendFrame("", 0, - (int)Poco::Net::WebSocket::FRAME_OP_PONG | - (int)Poco::Net::WebSocket::FRAME_FLAG_FIN); + (int) Poco::Net::WebSocket::FRAME_FLAG_FIN | + (int) Poco::Net::WebSocket::FRAME_OP_BINARY); } break; + case Poco::Net::WebSocket::FRAME_OP_PONG: { } break; + case Poco::Net::WebSocket::FRAME_OP_TEXT: { if (ReceivedBytes == 0) { EndConnection(Connection,__func__,__LINE__); @@ -631,19 +643,29 @@ namespace OpenWifi { } } } break; + case Poco::Net::WebSocket::FRAME_OP_BINARY: { if (ReceivedBytes == 0) { EndConnection(Connection,__func__,__LINE__); return; } else { poco_trace(Logger(), - fmt::format("Sending {} key strokes to device.", ReceivedBytes)); + fmt::format("Sending {} key strokes to device.", ReceivedBytes)); if (!RTTYS_server().KeyStrokes(Connection, FrameBuffer, ReceivedBytes)) { EndConnection(Connection,__func__,__LINE__); return; } } } break; + + case Poco::Net::WebSocket::FRAME_OP_CONT: { + // may have to handle this, but not sure whether it's a continuation for text or + // binary, seems to be a hole in the protocol. + poco_warning(Logger(), + fmt::format("CONT Frame {} received, ignoring for now.", + ReceivedBytes)); + } + case Poco::Net::WebSocket::FRAME_OP_CLOSE: { EndConnection(Connection,__func__,__LINE__); return; @@ -689,8 +711,8 @@ namespace OpenWifi { if (Connection->WSSocket_ != nullptr && Connection->WSSocket_->impl()!= nullptr) { try { Connection->WSSocket_->sendFrame(Buf, len, - Poco::Net::WebSocket::FRAME_FLAG_FIN | - Poco::Net::WebSocket::FRAME_OP_BINARY); + (int) Poco::Net::WebSocket::FRAME_FLAG_FIN | + (int) Poco::Net::WebSocket::FRAME_OP_BINARY); return; } catch (...) { poco_error(Logger(), "SendData shutdown."); @@ -992,8 +1014,9 @@ namespace OpenWifi { } bool RTTYS_server::SendToClient(Poco::Net::WebSocket &WebSocket, const u_char *Buf, int len) { - WebSocket.sendFrame( - Buf, len, Poco::Net::WebSocket::FRAME_FLAG_FIN | Poco::Net::WebSocket::FRAME_OP_BINARY); + WebSocket.sendFrame(Buf, len, + (int) Poco::Net::WebSocket::FRAME_FLAG_FIN | + (int) Poco::Net::WebSocket::FRAME_OP_BINARY); return true; }