Work with new WS behaviour

Signed-off-by: Carsten Schafer <Carsten.Schafer@kinarasystems.com>
This commit is contained in:
Carsten Schafer
2025-08-22 13:30:07 -04:00
parent 18b169e517
commit 66d580d047
8 changed files with 166 additions and 82 deletions

View File

@@ -1,5 +1,5 @@
ARG DEBIAN_VERSION=bookworm
ARG POCO_VERSION=poco-tip-v3-tag
ARG POCO_VERSION=poco-tip-v4-tag
ARG CPPKAFKA_VERSION=tip-v1
ARG VALIJASON_VERSION=tip-v1.0.2
ARG APP_NAME=owgw

View File

@@ -40,7 +40,7 @@ namespace OpenWifi {
Poco::Net::HTTPServerResponse &response,
uint64_t session_id, Poco::Logger &L,
std::pair<std::shared_ptr<Poco::Net::SocketReactor>, std::shared_ptr<LockedDbSession>> R)
: Logger_(L) {
: Logger_(L), IncomingFrame_(0) {
Reactor_ = R.first;
DbSession_ = R.second;
@@ -55,6 +55,7 @@ namespace OpenWifi {
WS_->setNoDelay(false);
WS_->setKeepAlive(true);
WS_->setBlocking(false);
IncomingFrame_.resize(0);
uuid_ = MicroServiceRandom(std::numeric_limits<std::uint64_t>::max()-1);
AP_WS_Server()->IncrementConnectionCount();
@@ -601,36 +602,89 @@ namespace OpenWifi {
EndConnection();
}
void AP_WS_Connection::ProcessIncomingFrame() {
Poco::Buffer<char> IncomingFrame(0);
void AP_WS_Connection::ProcessWSFinalPayload() {
auto IncomingSize = IncomingFrame_.size();
IncomingFrame_.append(0);
poco_trace(Logger_,
fmt::format("ProcessWSFrame({}): Final Frame received (len={}, Msg={}",
CId_, IncomingSize, IncomingFrame_.begin()));
Poco::JSON::Parser parser;
auto ParsedMessage = parser.parse(IncomingFrame_.begin());
auto IncomingJSON = ParsedMessage.extract<Poco::JSON::Object::Ptr>();
if (IncomingJSON->has(uCentralProtocol::JSONRPC)) {
if (IncomingJSON->has(uCentralProtocol::METHOD) &&
IncomingJSON->has(uCentralProtocol::PARAMS)) {
ProcessJSONRPCEvent(IncomingJSON);
} else if (IncomingJSON->has(uCentralProtocol::RESULT) &&
IncomingJSON->has(uCentralProtocol::ID)) {
poco_trace(Logger_, fmt::format("RPC-RESULT({}): payload: {}", CId_,
IncomingFrame_.begin()));
ProcessJSONRPCResult(IncomingJSON);
} else {
poco_warning(
Logger_,
fmt::format("INVALID-PAYLOAD({}): Payload is not JSON-RPC 2.0: {}",
CId_, IncomingFrame_.begin()));
}
} else if (IncomingJSON->has(uCentralProtocol::RADIUS)) {
ProcessIncomingRadiusData(IncomingJSON);
} else {
std::ostringstream iS;
IncomingJSON->stringify(iS);
poco_warning(
Logger_,
fmt::format("FRAME({}): illegal transaction header, missing 'jsonrpc': {}",
CId_, iS.str()));
Errors_++;
}
IncomingFrame_.clear();
IncomingFrame_.resize(0);
}
void AP_WS_Connection::ProcessIncomingFrame() {
Poco::Buffer<char> CurrentFrame(0);
bool KillConnection = false;
int flags = 0;
int IncomingSize = 0;
bool KillConnection=false;
try {
int Op, flags;
auto IncomingSize = WS_->receiveFrame(IncomingFrame, flags);
IncomingSize = WS_->receiveFrame(CurrentFrame, flags);
int Op;
Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
if (IncomingSize == 0 && flags == 0 && Op == 0) {
if (IncomingSize < 0 && flags == 0) {
poco_trace(Logger_,
fmt::format("EMPTY({}): Non-blocking try-again empty frame (len={}, flags={})",
CId_, IncomingSize, flags));
} else if (IncomingSize == 0 && flags == 0) {
poco_information(Logger_,
fmt::format("DISCONNECT({}): device has disconnected. Session={}",
CId_, State_.sessionId));
return EndConnection();
}
IncomingFrame.append(0);
State_.RX += IncomingSize;
AP_WS_Server()->AddRX(IncomingSize);
if (IncomingSize > 0) {
State_.RX += IncomingSize;
AP_WS_Server()->AddRX(IncomingSize);
IncomingFrame_.append(CurrentFrame);
}
State_.MessageCount++;
State_.LastContact = Utils::Now();
poco_trace(Logger_,
fmt::format("FRAME({}): Frame {} rx (len={}, flags={}, acc.len={})",
CId_, Op, IncomingSize, flags, IncomingFrame_.size()));
switch (Op) {
case Poco::Net::WebSocket::FRAME_OP_PING: {
poco_trace(Logger_, fmt::format("WS-PING({}): received. PONG sent back.", CId_));
poco_trace(Logger_, fmt::format("PING({}): received. PONG sent back.", CId_));
WS_->sendFrame("", 0,
(int)Poco::Net::WebSocket::FRAME_OP_PONG |
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
if (KafkaManager()->Enabled()) {
Poco::JSON::Object PingObject;
@@ -646,47 +700,28 @@ namespace OpenWifi {
poco_trace(Logger_,fmt::format("Sending PING for {}", SerialNumber_));
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_,PingObject);
}
return;
} break;
case Poco::Net::WebSocket::FRAME_OP_PONG: {
poco_trace(Logger_, fmt::format("PONG({}): received and ignored.", CId_));
return;
} break;
case Poco::Net::WebSocket::FRAME_OP_CONT: {
poco_trace(Logger_, fmt::format("CONTINUATION({}): registered.", CId_));
} break;
case Poco::Net::WebSocket::FRAME_OP_BINARY: {
poco_trace(Logger_, fmt::format("BINARY({}): Invalid frame type.", CId_));
KillConnection=true;
return;
} break;
case Poco::Net::WebSocket::FRAME_OP_TEXT: {
poco_trace(Logger_,
fmt::format("FRAME({}): Frame received (length={}, flags={}). Msg={}",
CId_, IncomingSize, flags, IncomingFrame.begin()));
Poco::JSON::Parser parser;
auto ParsedMessage = parser.parse(IncomingFrame.begin());
auto IncomingJSON = ParsedMessage.extract<Poco::JSON::Object::Ptr>();
if (IncomingJSON->has(uCentralProtocol::JSONRPC)) {
if (IncomingJSON->has(uCentralProtocol::METHOD) &&
IncomingJSON->has(uCentralProtocol::PARAMS)) {
ProcessJSONRPCEvent(IncomingJSON);
} else if (IncomingJSON->has(uCentralProtocol::RESULT) &&
IncomingJSON->has(uCentralProtocol::ID)) {
poco_trace(Logger_, fmt::format("RPC-RESULT({}): payload: {}", CId_,
IncomingFrame.begin()));
ProcessJSONRPCResult(IncomingJSON);
} else {
poco_warning(
Logger_,
fmt::format("INVALID-PAYLOAD({}): Payload is not JSON-RPC 2.0: {}",
CId_, IncomingFrame.begin()));
}
} else if (IncomingJSON->has(uCentralProtocol::RADIUS)) {
ProcessIncomingRadiusData(IncomingJSON);
} else {
std::ostringstream iS;
IncomingJSON->stringify(iS);
poco_warning(
Logger_,
fmt::format("FRAME({}): illegal transaction header, missing 'jsonrpc': {}",
CId_, iS.str()));
Errors_++;
}
fmt::format("TEXT({}): Frame received (len={}, flags={}). Msg={}",
CId_, IncomingSize, flags, CurrentFrame.begin()));
} break;
case Poco::Net::WebSocket::FRAME_OP_CLOSE: {
@@ -702,25 +737,31 @@ namespace OpenWifi {
return;
}
}
// Check for final frame and process accumlated payload
if (!KillConnection && (flags & Poco::Net::WebSocket::FRAME_FLAG_FIN) != 0) {
ProcessWSFinalPayload();
}
} catch (const Poco::Net::ConnectionResetException &E) {
poco_warning(Logger_,
fmt::format("ConnectionResetException({}): Text:{} Payload:{} Session:{}",
CId_, E.displayText(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (const Poco::JSON::JSONException &E) {
poco_warning(Logger_,
fmt::format("JSONException({}): Text:{} Payload:{} Session:{}", CId_,
E.displayText(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (const Poco::Net::WebSocketException &E) {
poco_warning(Logger_,
fmt::format("WebSocketException({}): Text:{} Payload:{} Session:{}", CId_,
E.displayText(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (const Poco::Net::SSLConnectionUnexpectedlyClosedException &E) {
@@ -729,42 +770,42 @@ namespace OpenWifi {
fmt::format(
"SSLConnectionUnexpectedlyClosedException({}): Text:{} Payload:{} Session:{}",
CId_, E.displayText(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (const Poco::Net::SSLException &E) {
poco_warning(Logger_,
fmt::format("SSLException({}): Text:{} Payload:{} Session:{}", CId_,
E.displayText(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (const Poco::Net::NetException &E) {
poco_warning(Logger_,
fmt::format("NetException({}): Text:{} Payload:{} Session:{}", CId_,
E.displayText(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (const Poco::IOException &E) {
poco_warning(Logger_,
fmt::format("IOException({}): Text:{} Payload:{} Session:{}", CId_,
E.displayText(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (const Poco::Exception &E) {
poco_warning(Logger_,
fmt::format("Exception({}): Text:{} Payload:{} Session:{}", CId_,
E.displayText(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (const std::exception &E) {
poco_warning(Logger_,
fmt::format("std::exception({}): Text:{} Payload:{} Session:{}", CId_,
E.what(),
IncomingFrame.begin() == nullptr ? "" : IncomingFrame.begin(),
CurrentFrame.begin() == nullptr ? "" : CurrentFrame.begin(),
State_.sessionId));
KillConnection=true;
} catch (...) {
@@ -777,7 +818,9 @@ namespace OpenWifi {
if (!KillConnection && Errors_ < 10)
return;
poco_warning(Logger_, fmt::format("DISCONNECTING({}): ConnectionException: {} Errors: {}", CId_, KillConnection, Errors_ ));
poco_warning(Logger_,
fmt::format("DISCONNECTING({}): ConnectionException: {} Errors: {}",
CId_, KillConnection, Errors_ ));
EndConnection();
}

View File

@@ -34,6 +34,7 @@ namespace OpenWifi {
void EndConnection();
void ProcessJSONRPCEvent(Poco::JSON::Object::Ptr &Doc);
void ProcessJSONRPCResult(Poco::JSON::Object::Ptr Doc);
void ProcessWSFinalPayload();
void ProcessIncomingFrame();
void ProcessIncomingRadiusData(const Poco::JSON::Object::Ptr &Doc);
@@ -146,6 +147,7 @@ namespace OpenWifi {
std::uint64_t uuid_=0;
bool Simulated_=false;
std::atomic_uint64_t LastContact_=0;
Poco::Buffer<char> IncomingFrame_;
static inline std::atomic_uint64_t ConcurrentStartingDevices_ = 0;

View File

@@ -120,14 +120,16 @@ namespace OpenWifi {
Poco::Buffer<char> IncomingFrame(0);
try {
int Op, flags;
int IncomingSize;
int Op, flags, IncomingSize;
IncomingSize = WS_->receiveFrame(IncomingFrame, flags);
Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
if (IncomingSize == 0 && flags == 0 && Op == 0) {
poco_information(
Logger(),
if (IncomingSize == -1) {
poco_trace(Logger(),
fmt::format("TELEMETRY-EMPTY({}): Empty frame, non-blocking try-again.", CId_));
} else if (IncomingSize == 0 && flags == 0 && Op == 0) {
poco_information(Logger(),
fmt::format("TELEMETRY-DISCONNECT({}): device has disconnected.", CId_));
MustDisconnect = true;
} else {
@@ -138,10 +140,12 @@ namespace OpenWifi {
(int)Poco::Net::WebSocket::FRAME_OP_PONG |
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
} else if (Op == Poco::Net::WebSocket::FRAME_OP_CLOSE) {
poco_information(
Logger(),
poco_information(Logger(),
fmt::format("TELEMETRY-DISCONNECT({}): device wants to disconnect.", CId_));
MustDisconnect = true;
} else if (Op == Poco::Net::WebSocket::FRAME_OP_CONT) {
poco_information(Logger(),
fmt::format("TELEMETRY-CONT({}): rx {} bytes.", CId_, IncomingSize));
}
}
} catch (...) {

View File

@@ -295,6 +295,7 @@ namespace OpenWifi {
auto Level = Poco::Logger::parseLevel(
MicroService::instance().ConfigGetString("logging.level", "debug"));
//Level = Poco::Logger::parseLevel("trace"); // TODO: remove this
Poco::Logger::root().setLevel(Level);
if (!DisableWebSocketLogging) {
static const UI_WebSocketClientServer::NotificationTypeIdVec Notifications = {

View File

@@ -210,10 +210,16 @@ namespace OpenWifi {
n = Client->second->WS_->receiveFrame(IncomingFrame, flags);
auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
if (n == -1) {
poco_warning(Logger(),
fmt::format("UI-EMPTY({}): {} Empty Frame flags {}.",
Client->second->Id_, Client->second->UserName_, flags));
return;
}
if (n == 0) {
poco_debug(Logger(),
fmt::format("CLOSE({}): {} UI Client is closing WS connection.",
Client->second->Id_, Client->second->UserName_));
fmt::format("CLOSE({}): {} UI Client is closing WS connection.",
Client->second->Id_, Client->second->UserName_));
return EndConnection(Client);
}
@@ -221,7 +227,7 @@ namespace OpenWifi {
case Poco::Net::WebSocket::FRAME_OP_PING: {
Client->second->WS_->sendFrame("", 0,
(int)Poco::Net::WebSocket::FRAME_OP_PONG |
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
} break;
case Poco::Net::WebSocket::FRAME_OP_PONG: {
} break;
@@ -231,6 +237,11 @@ namespace OpenWifi {
Client->second->Id_, Client->second->UserName_));
return EndConnection(Client);
} break;
case Poco::Net::WebSocket::FRAME_OP_CONT: {
poco_warning(Logger(),
fmt::format("CONT({}): {} Unexpected CONT Frame - Ignoring.",
Client->second->Id_, Client->second->UserName_));
} break;
case Poco::Net::WebSocket::FRAME_OP_TEXT: {
constexpr const char *DropMessagesCommand = "drop-notifications";
IncomingFrame.append(0);

View File

@@ -580,14 +580,16 @@ namespace OpenWifi {
try {
Client = Clients_.find(pNf->socket().impl()->sockfd());
if (Client == end(Clients_)) {
poco_warning(Logger(), fmt::format("Cannot find client socket: {}",
pNf->socket().impl()->sockfd()));
poco_warning(Logger(),
fmt::format("Cannot find client socket: {}",
pNf->socket().impl()->sockfd()));
return;
}
Connection = Client->second;
if(Connection->WSSocket_==nullptr || Connection->WSSocket_->impl()==nullptr) {
poco_warning(Logger(), fmt::format("WebSocket is no valid: {}",
Connection->SerialNumber_));
poco_warning(Logger(),
fmt::format("WebSocket is not valid: {}",
Connection->SerialNumber_));
return;
}
@@ -596,15 +598,25 @@ namespace OpenWifi {
auto ReceivedBytes = Connection->WSSocket_->receiveFrame(FrameBuffer, sizeof(FrameBuffer), flags);
auto Op = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
if (ReceivedBytes == -1) {
poco_trace(Logger(),
fmt::format("WS-EMPTY{}: Non-blocking try-again empty Frame: flags {}",
Connection->SerialNumber_, flags));
return;
}
switch (Op) {
case Poco::Net::WebSocket::FRAME_OP_PING: {
Connection->WSSocket_->sendFrame("", 0,
(int)Poco::Net::WebSocket::FRAME_OP_PONG |
(int)Poco::Net::WebSocket::FRAME_FLAG_FIN);
(int) Poco::Net::WebSocket::FRAME_FLAG_FIN |
(int) Poco::Net::WebSocket::FRAME_OP_BINARY);
} break;
case Poco::Net::WebSocket::FRAME_OP_PONG: {
} break;
case Poco::Net::WebSocket::FRAME_OP_TEXT: {
if (ReceivedBytes == 0) {
EndConnection(Connection,__func__,__LINE__);
@@ -631,19 +643,29 @@ namespace OpenWifi {
}
}
} break;
case Poco::Net::WebSocket::FRAME_OP_BINARY: {
if (ReceivedBytes == 0) {
EndConnection(Connection,__func__,__LINE__);
return;
} else {
poco_trace(Logger(),
fmt::format("Sending {} key strokes to device.", ReceivedBytes));
fmt::format("Sending {} key strokes to device.", ReceivedBytes));
if (!RTTYS_server().KeyStrokes(Connection, FrameBuffer, ReceivedBytes)) {
EndConnection(Connection,__func__,__LINE__);
return;
}
}
} break;
case Poco::Net::WebSocket::FRAME_OP_CONT: {
// may have to handle this, but not sure whether it's a continuation for text or
// binary, seems to be a hole in the protocol.
poco_warning(Logger(),
fmt::format("CONT Frame {} received, ignoring for now.",
ReceivedBytes));
}
case Poco::Net::WebSocket::FRAME_OP_CLOSE: {
EndConnection(Connection,__func__,__LINE__);
return;
@@ -689,8 +711,8 @@ namespace OpenWifi {
if (Connection->WSSocket_ != nullptr && Connection->WSSocket_->impl()!= nullptr) {
try {
Connection->WSSocket_->sendFrame(Buf, len,
Poco::Net::WebSocket::FRAME_FLAG_FIN |
Poco::Net::WebSocket::FRAME_OP_BINARY);
(int) Poco::Net::WebSocket::FRAME_FLAG_FIN |
(int) Poco::Net::WebSocket::FRAME_OP_BINARY);
return;
} catch (...) {
poco_error(Logger(), "SendData shutdown.");
@@ -992,8 +1014,9 @@ namespace OpenWifi {
}
bool RTTYS_server::SendToClient(Poco::Net::WebSocket &WebSocket, const u_char *Buf, int len) {
WebSocket.sendFrame(
Buf, len, Poco::Net::WebSocket::FRAME_FLAG_FIN | Poco::Net::WebSocket::FRAME_OP_BINARY);
WebSocket.sendFrame(Buf, len,
(int) Poco::Net::WebSocket::FRAME_FLAG_FIN |
(int) Poco::Net::WebSocket::FRAME_OP_BINARY);
return true;
}