Fix to not remove the active sessions.

Also added some more logging
This commit is contained in:
Rahul Sharma
2021-08-10 18:29:47 -04:00
parent 7216b08ac7
commit 15698e99cb

View File

@@ -48,11 +48,12 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
* To test this method:
* curl --digest --user user:password --request POST --insecure --header "Content-Type: application/json; charset=utf-8" --data '' https://localhost:7072/api/portFwd/createSession/inventoryId/dev-ap-0001/port/22/
*
* @param inventoryId
* @param connectToPortOnEquipment
* @param inventoryId: Equipment's AssetId
* @param connectToPortOnEquipment: Port to connect on the AP
* @return session id of a newly created forwarder session
*/
public String startForwarderSession(final String inventoryId, int connectToPortOnEquipment){
LOG.debug("Received create Session request for inventoryId {} on port {}", inventoryId, connectToPortOnEquipment);
//inventoryId is used to tie ForwarderSession to WebSocketSession
try {
@@ -191,7 +192,7 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
LOG.debug("[{}] Stopped polling inputstream on local socket {}", inventoryId, localSocket.getPort());
} catch (IOException e) {
LOG.error("[{}] Session {} got exception {} for the socket on port {}", inventoryId, forwarderSession.getSessionId(), e, port);
LOG.error("[{}] Session {} got IOException {} for the socket on port {}", inventoryId, forwarderSession.getSessionId(), port, e);
} finally {
//notify the other end that forwarding session is terminated
@@ -209,7 +210,7 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
localSocket.close();
forwarderSession.getServerSocket().close();
} catch (IOException e) {
//do nothing here
LOG.error("IO Exception when closing socket", e);
}
sessionIdToForwarderSessionMap.remove(forwarderSession.getSessionId());
@@ -233,11 +234,11 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
forwarderSession.setSocketStreamReaderThread(socketInputStreamReaderThread);
} catch (Exception e) {
LOG.error("[{}] error accepting conection on port {} - closing forwarding session {}", inventoryId, listenOnLocalPort, forwarderSession.getSessionId());
LOG.error("[{}] error accepting connection on port {} - closing forwarding session {}", inventoryId, listenOnLocalPort, forwarderSession.getSessionId(), e);
try {
serverSocket.close();
} catch (IOException e1) {
// do nothing here
LOG.error("IO Exception when closing socket", e1);
}
sessionIdToForwarderSessionMap.remove(forwarderSession.getSessionId());
}
@@ -278,20 +279,17 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
if(forwarderSession.getServerSocket() != null){
forwarderSession.getServerSocket().close();
LOG.debug("Closed forwarderSession server socket for sessionId {}", sessionId);
}
if(forwarderSession.getLocalSocket() != null){
forwarderSession.getLocalSocket().close();
LOG.debug("Closed forwarderSession local socket for sessionId {}", sessionId);
}
//stream reader will stop in a separate thread by themselves
sessionIdToForwarderSessionMap.remove(forwarderSession.getSessionId());
} catch (Exception e) {
// do nothing here
LOG.error("Encountered exception when closing connection for forwarder session {}", forwarderSession, e);
} catch (IOException e) {
LOG.error("Encountered IOException when closing connection for forwarder session {}", forwarderSession, e);
sessionIdToForwarderSessionMap.remove(forwarderSession.getSessionId());
}
@@ -326,7 +324,6 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
if(payload.indexOf(':')>0){
port = Integer.parseInt(payload.substring(payload.indexOf(':')+1));
}
LOG.debug("handleTextMessage: Port {} is used on Equipment {}", port, webSocketSessionKey);
//find forwarderSession by inventoryId and CEPort
ForwarderSession forwarderSession = null;
for(ForwarderSession fs: sessionIdToForwarderSessionMap.values()){
@@ -375,20 +372,18 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
*/
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
//TODO: may need to have message ack for each binary packet, and do not send the next packet until previous one has been acknowledged
//DT: this has not been an issue so far
String webSocketSessionKey = getWebSocketSessionKey(session);
ByteBuffer payload = message.getPayload();
int msgPayloadLength = message.getPayloadLength();
int port = payload.getInt();
LOG.debug("handleBinaryMessage: Port {} is used on Equipment {}", port, webSocketSessionKey);
//find forwarderSession by inventoryId and CEPort
ForwarderSession forwarderSession = null;
for(ForwarderSession fs: sessionIdToForwarderSessionMap.values()){
if(fs.getInventoryId().equals(webSocketSessionKey) && fs.getConnectToPortOnEquipment()==port){
forwarderSession = fs;
@@ -425,14 +420,13 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
} else {
LOG.debug("[{}] Session {} received message that cannot be delivered because local socket is inoperable {}", webSocketSessionKey, session, message);
if (forwarderSession == null) {
LOG.debug("forwarderSession not found fpr webSocketSessionKey {}", webSocketSessionKey);
LOG.debug("forwarderSession not found for webSocketSessionKey {}", webSocketSessionKey);
} else if (forwarderSession.getLocalSocket() == null) {
LOG.debug("forwarderSession local socket is null for webSocketSessionKey {}", webSocketSessionKey);
} else {
LOG.debug("forwarderSession local socket for webSocketSessionKey {} is closed = {} and connected = {} ",
webSocketSessionKey, forwarderSession.getLocalSocket().isClosed(), forwarderSession.getLocalSocket().isConnected());
}
}
}
@@ -456,32 +450,29 @@ public class PortForwarderWebSocketHandler extends AbstractWebSocketHandler {
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
String webSocketSessionKey = getWebSocketSessionKey(session);
LOG.info("[{}] Closed portForwarder websocket connection {} : {}", webSocketSessionKey, session, closeStatus);
webSocketSessionMap.remove(webSocketSessionKey);
LOG.debug(" Removed key {} from webSocketSessionMap", webSocketSessionKey);
//close and remove all forwarder sessions for that CE
Iterator<ForwarderSession> iter = sessionIdToForwarderSessionMap.values().iterator();
while(iter.hasNext()){
ForwarderSession fs = iter.next();
if (fs.getInventoryId().equals(webSocketSessionKey)) {
LOG.debug("Closing webSocketSession for forwarderSession: {} ", fs);
if(fs.getLocalSocket()!=null && !fs.getLocalSocket().isClosed()){
fs.getLocalSocket().close();
LOG.debug("Closing local Socket for fs {}", fs);
}
if(fs.getServerSocket()!=null && !fs.getServerSocket().isClosed()){
fs.getServerSocket().close();
LOG.debug("Closing Server Socket for fs {}", fs);
try {
Iterator<ForwarderSession> iter = sessionIdToForwarderSessionMap.values().iterator();
while (iter.hasNext()) {
ForwarderSession fs = iter.next();
if (fs.getInventoryId().equals(webSocketSessionKey)) {
if (fs.getLocalSocket() != null && !fs.getLocalSocket().isClosed()) {
fs.getLocalSocket().close();
}
if (fs.getServerSocket() != null && !fs.getServerSocket().isClosed()) {
fs.getServerSocket().close();
}
iter.remove();
}
}
iter.remove();
} catch (Exception ex) {
LOG.error("Encountered exception when closing-sockets and removing sessions for Key {} ", webSocketSessionKey, ex);
}
}
private String getWebSocketSessionKey(WebSocketSession session) {