mirror of
				https://github.com/Telecominfraproject/wlan-cloud-opensync-controller.git
				synced 2025-10-31 02:27:49 +00:00 
			
		
		
		
	[WIFI-7823] Adding ctxRoutingId handling in connect/disconnect paths to determine true disconnect
This commit is contained in:
		| @@ -25,6 +25,8 @@ import java.util.regex.Matcher; | ||||
| import java.util.regex.Pattern; | ||||
| import java.util.stream.Collectors; | ||||
| import java.util.Arrays; | ||||
| import java.util.Collections; | ||||
| import java.util.Comparator; | ||||
|  | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| @@ -886,23 +888,17 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void apDisconnected(String apId) { | ||||
|         LOG.info("AP {} got disconnected from the gateway", apId); | ||||
|     public void apDisconnected(String apId, Long ctxRoutingId) { | ||||
|         LOG.info("AP {} got disconnected from the gateway, remove ctxRoutingId {} ", apId, ctxRoutingId); | ||||
|         try { | ||||
|  | ||||
|             OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(apId); | ||||
|  | ||||
|             if (ovsdbSession != null) { | ||||
|                 if (ovsdbSession.getRoutingId() > 0L) { | ||||
|                     try { | ||||
|                         routingServiceInterface.delete(ovsdbSession.getRoutingId()); | ||||
|                     } catch (Exception e) { | ||||
|                         LOG.warn("Unable to delete routing service Id {} for ap {}. {}", ovsdbSession.getRoutingId(), apId, e); | ||||
|                     } | ||||
|             if (ctxRoutingId != null && ctxRoutingId > 0L) { | ||||
|                 try { | ||||
|                     routingServiceInterface.delete(ctxRoutingId); | ||||
|                 } catch (Exception e) { | ||||
|                     LOG.warn("Unable to delete routing service Id {} for ap {}. {}", ctxRoutingId, apId, e); | ||||
|                 } | ||||
|             } else { | ||||
|                 LOG.warn("Cannot find ap {} in inventory", apId); | ||||
|             } | ||||
|  | ||||
|             Equipment ce = equipmentServiceInterface.getByInventoryIdOrNull(apId); | ||||
|             if (ce != null) { | ||||
|                 List<Status> deletedStatuses = statusServiceInterface.deleteOnEquipmentDisconnect(ce.getCustomerId(), ce.getId()); | ||||
| @@ -918,6 +914,47 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
|         } | ||||
|  | ||||
|     } | ||||
|      | ||||
|     @Override | ||||
|     public Long getLatestRoutingId(String apId) { | ||||
|         try { | ||||
|             Equipment ce = equipmentServiceInterface.getByInventoryIdOrNull(apId); | ||||
|             if (ce != null) { | ||||
|                 List<EquipmentRoutingRecord> recordList = routingServiceInterface.getRegisteredRouteList(ce.getId()); | ||||
|                 LOG.info("{} routing records found for AP {}, sort and return latest routingRecordId", recordList.size(), apId); | ||||
|                 if (!recordList.isEmpty()) { | ||||
|                     // Sort by latest record first | ||||
|                     Collections.sort(recordList, new Comparator<EquipmentRoutingRecord>() { | ||||
|                         @Override | ||||
|                         public int compare(EquipmentRoutingRecord o1, EquipmentRoutingRecord o2) { | ||||
|                             return Long.compare(o2.getLastModifiedTimestamp(), o1.getLastModifiedTimestamp()); | ||||
|                         } | ||||
|                     }); | ||||
|                     return recordList.get(0).getId(); | ||||
|                 } else { | ||||
|                     LOG.debug("No records found for AP {} equipmentId {}", apId, ce.getId()); | ||||
|                     return null; | ||||
|                 } | ||||
|             } else { | ||||
|                 // Equipment doesn't exist, nothing to clean up | ||||
|                 LOG.debug("No equipment found for AP {} ", apId); | ||||
|                 return null; | ||||
|             } | ||||
|         } catch (Exception e) { | ||||
|             LOG.error("Exception when registering ap routing {}", apId, e); | ||||
|             // Equipment doesn't exist, nothing to clean up | ||||
|             return null; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void removeRoutingRecord(Long routingId) { | ||||
|         try { | ||||
|             routingServiceInterface.delete(routingId); | ||||
|         } catch (Exception e) { | ||||
|             LOG.warn("Unable to delete routing service Id {}. {}", routingId, e); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private void updateApDisconnectedStatus(String apId, Equipment ce) { | ||||
|         LOG.info("updateApDisconnectedStatus disconnected AP {}", apId); | ||||
|   | ||||
| @@ -394,7 +394,7 @@ public class OpensyncExternalIntegrationCloudTest { | ||||
|  | ||||
|     @Test | ||||
|     public void testApDisconnected() { | ||||
|         opensyncExternalIntegrationCloud.apDisconnected("Test_Client_21P10C68818122"); | ||||
|         opensyncExternalIntegrationCloud.apDisconnected("Test_Client_21P10C68818122", 0L); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|   | ||||
| @@ -22,7 +22,11 @@ public interface OpensyncExternalIntegrationInterface { | ||||
|         INIT, INSERT, DELETE, MODIFY | ||||
|     } | ||||
|  | ||||
|     void apDisconnected(String apId); | ||||
|     void apDisconnected(String apId, Long ctxRoutingId); | ||||
|      | ||||
|     Long getLatestRoutingId(String apId); | ||||
|      | ||||
|     void removeRoutingRecord(Long routingId); | ||||
|  | ||||
|     OpensyncAPConfig getApConfig(String apId); | ||||
|  | ||||
|   | ||||
| @@ -64,7 +64,7 @@ public class OpensyncExternalIntegrationSimple implements OpensyncExternalIntegr | ||||
|         LOG.info("ConnectNodeInfo {}", connectNodeInfo); | ||||
|     } | ||||
|  | ||||
|     public void apDisconnected(String apId) { | ||||
|     public void apDisconnected(String apId, Long ctxRoutingId) { | ||||
|         LOG.info("AP {} got disconnected from the gateway", apId); | ||||
|     } | ||||
|  | ||||
| @@ -222,4 +222,15 @@ public class OpensyncExternalIntegrationSimple implements OpensyncExternalIntegr | ||||
|     public void updateConfigVersionInStatus(String apId, long configVersionFromProfiles) { | ||||
|         // do nothing here | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public Long getLatestRoutingId(String apId) { | ||||
|         LOG.info("getLatestRoutingId for AP {}", apId); | ||||
|         return null; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void removeRoutingRecord(Long routingId) { | ||||
|         LOG.info("removeRoutingRecord for routingId {}", routingId);         | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import com.telecominfraproject.wlan.opensync.external.integration.models.*; | ||||
| import com.telecominfraproject.wlan.opensync.ovsdb.dao.OvsdbDao; | ||||
| import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbClientWithMetrics; | ||||
| import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbMetrics; | ||||
| import com.telecominfraproject.wlan.opensync.util.OvsdbClientUtil; | ||||
| import com.telecominfraproject.wlan.opensync.util.OvsdbStringConstants; | ||||
| import com.telecominfraproject.wlan.opensync.util.SslUtil; | ||||
| import com.vmware.ovsdb.callback.ConnectionCallback; | ||||
| @@ -141,26 +142,32 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface { | ||||
| 					subjectDn = ((X509Certificate) remoteCertificate).getSubjectDN().getName(); | ||||
|  | ||||
|                     String clientCn = SslUtil.extractCN(subjectDn); | ||||
|                     LOG.info("ovsdbClient connecting from {} on port {} clientCn {}", remoteHost, localPort, clientCn); | ||||
|  | ||||
|                     ConnectNodeInfo connectNodeInfo = ovsdbDao.getConnectNodeInfo(ovsdbClient); | ||||
|  | ||||
|                     // successfully connected - register it in our | ||||
|                     // connectedClients table | ||||
|  | ||||
|                     String key = alterClientCnIfRequired(clientCn, connectNodeInfo); | ||||
|                     ovsdbSessionMapInterface.newSession(key, ovsdbClient); | ||||
|  | ||||
|                     extIntegrationInterface.apConnected(key, connectNodeInfo); | ||||
|  | ||||
|                     processConnectRequest(ovsdbClient, clientCn, connectNodeInfo); | ||||
|  | ||||
|                     monitorOvsdbStateTables(ovsdbClient, key); | ||||
|  | ||||
|                     connectionsCreated.increment(); | ||||
|                     LOG.info("ovsdbClient connected from {} on port {} AP {} ", remoteHost, localPort, key); | ||||
|                     LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions()); | ||||
|  | ||||
|                     if (clientCn != null && !clientCn.isEmpty()) { | ||||
|                         LOG.info("ovsdbClient connecting from {} on port {} clientCn {}", remoteHost, localPort, clientCn); | ||||
|      | ||||
|                         ConnectNodeInfo connectNodeInfo = ovsdbDao.getConnectNodeInfo(ovsdbClient); | ||||
|      | ||||
|                         // successfully connected - register it in our | ||||
|                         // connectedClients table | ||||
|      | ||||
|                         String key = alterClientCnIfRequired(clientCn, connectNodeInfo); | ||||
|                         OvsdbSession ovsdbSession = ovsdbSessionMapInterface.newSession(key, ovsdbClient); | ||||
|                         extIntegrationInterface.apConnected(key, connectNodeInfo); | ||||
|                         // DT: at this point the routing Id is associated with the session, let's store it into the | ||||
|                         // connectionInfo object so that the disconnect handler has access to it | ||||
|                         OvsdbClientUtil.setRoutingId(ovsdbClient, ovsdbSession.getRoutingId()); | ||||
|      | ||||
|                         processConnectRequest(ovsdbClient, clientCn, connectNodeInfo); | ||||
|      | ||||
|                         monitorOvsdbStateTables(ovsdbClient, key); | ||||
|      | ||||
|                         connectionsCreated.increment(); | ||||
|                         LOG.info("ovsdbClient connected from {} on port {} AP {} ", remoteHost, localPort, key); | ||||
|                         LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions()); | ||||
|                     } else { | ||||
|                         LOG.debug("ovsdbClient: clientCn is empty - not connecting."); | ||||
|                     } | ||||
|                      | ||||
|                 } catch (IllegalStateException e) { | ||||
|                     connectionsFailed.increment(); | ||||
|                     LOG.error("autoprovisioning error {}", e.getMessage(), e); | ||||
| @@ -181,6 +188,7 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface { | ||||
|                 connectionsDropped.increment(); | ||||
|  | ||||
|                 String remoteHost; | ||||
|                 int remotePort; | ||||
|                 int localPort; | ||||
|                 String clientCn; | ||||
|  | ||||
| @@ -196,6 +204,7 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface { | ||||
|  | ||||
|                 try { | ||||
|                     remoteHost = ovsdbClient.getConnectionInfo().getRemoteAddress().getHostAddress(); | ||||
|                     remotePort = ovsdbClient.getConnectionInfo().getRemotePort(); | ||||
|                     localPort = ovsdbClient.getConnectionInfo().getLocalPort(); | ||||
|                     String subjectDn = null; | ||||
|                     try { | ||||
| @@ -206,17 +215,63 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface { | ||||
|                         // do nothing | ||||
|                     } | ||||
|                     clientCn = SslUtil.extractCN(subjectDn); | ||||
|                     key = ovsdbSessionMapInterface.lookupClientId(ovsdbClient); | ||||
|                     if (key != null) { | ||||
|                         try { | ||||
|                             extIntegrationInterface.apDisconnected(key); | ||||
|                             ovsdbSessionMapInterface.removeSession(key); | ||||
|                         } catch (Exception e) { | ||||
|                             LOG.debug("Unable to process ap disconnect. {}", e); | ||||
|                     if (clientCn != null && !clientCn.isEmpty()) { | ||||
|                         Long ctxRoutingId =  OvsdbClientUtil.getRoutingId(ovsdbClient); | ||||
|                         Long latestDbRoutingId = extIntegrationInterface.getLatestRoutingId(clientCn); | ||||
|                         OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(clientCn); | ||||
|  | ||||
|                         LOG.info("Determine true disconnect with AP {} ctxRoutingId {} latestDbRoutingId {} ", clientCn, ctxRoutingId, latestDbRoutingId); | ||||
|                         // Determine whether this is a true disconnect based on latest routing record | ||||
|                         if (ctxRoutingId != null && latestDbRoutingId != null) { | ||||
|                             // if context matches latest DB routingId, this is a true disconnect | ||||
|                             if (ctxRoutingId.equals(latestDbRoutingId)) { | ||||
|                                 try { | ||||
|                                     extIntegrationInterface.apDisconnected(clientCn, ctxRoutingId); | ||||
|                                     ovsdbSessionMapInterface.removeSession(clientCn); | ||||
|                                 } catch (Exception e) { | ||||
|                                     LOG.debug("Unable to process ap disconnect. {}", e); | ||||
|                                 } | ||||
|                             } else { | ||||
|                                 if (ovsdbSession != null) { | ||||
|                                     if (!latestDbRoutingId.equals(ovsdbSession.getRoutingId())) { | ||||
|                                         ovsdbSessionMapInterface.removeSession(clientCn); | ||||
|                                     } | ||||
|                                 } | ||||
|                                  | ||||
|                                 extIntegrationInterface.removeRoutingRecord(ctxRoutingId); | ||||
|                             } | ||||
|                              | ||||
|                         // else, clearly handle all other cases | ||||
|                         } else if (ctxRoutingId == null && latestDbRoutingId != null) { | ||||
|                             // session was not initialized properly in connect path, we can remove this session | ||||
|                             LOG.debug("ctxRoutingId null latestDbRoutingId {}, remove non-initialized session for AP {} ", latestDbRoutingId, clientCn); | ||||
|                             if (ovsdbSession != null) { | ||||
|                                 if (ovsdbSession.getRoutingId() == 0L) { | ||||
|                                     ovsdbSessionMapInterface.removeSession(clientCn); | ||||
|                                 } | ||||
|                             } | ||||
|                              | ||||
|                         } else if (ctxRoutingId != null && latestDbRoutingId == null) { | ||||
|                             // no routing exist at all, so we know this session is not using any routing record, we can remove this session | ||||
|                             LOG.debug("ctxRoutingId {} latestDbRoutingId null, no routing exists, remove session for AP {} ", latestDbRoutingId, clientCn); | ||||
|                             if (ovsdbSession != null) { | ||||
|                                 if (ctxRoutingId.equals(ovsdbSession.getRoutingId())) { | ||||
|                                     ovsdbSessionMapInterface.removeSession(clientCn); | ||||
|                                 } | ||||
|                             } | ||||
|                              | ||||
|                         } else { | ||||
|                             // ctxRoutingId == null && latestDbRoutingId == null | ||||
|                             // This session is not initialized properly and no routings exist for any session | ||||
|                             // We can remove this session without any checks | ||||
|                             LOG.debug("ctxRoutingId null latestDbRoutingId null, remove session for AP {} ", latestDbRoutingId, clientCn); | ||||
|                             ovsdbSessionMapInterface.removeSession(clientCn); | ||||
|                         } | ||||
|                          | ||||
|                         LOG.info("ovsdbClient disconnected from {} on local port {} remote port {} clientCn {} ", remoteHost, localPort, | ||||
|                                 remotePort, clientCn); | ||||
|                         LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions()); | ||||
|                     } | ||||
|                     LOG.info("ovsdbClient disconnected from {} on port {} clientCn {} AP {} ", remoteHost, localPort, clientCn, key); | ||||
|                     LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions()); | ||||
|                 } finally { | ||||
|                     try { | ||||
|                         ovsdbClient.shutdown(); | ||||
|   | ||||
| @@ -25,6 +25,10 @@ public class OvsdbClientWithMetrics implements OvsdbClient { | ||||
|         this.delegate = delegate; | ||||
|         this.metrics = metrics; | ||||
|     } | ||||
|      | ||||
|     public OvsdbClient getDelegate() { | ||||
|         return delegate; | ||||
|     } | ||||
|  | ||||
|     public CompletableFuture<String[]> listDatabases() throws OvsdbClientException { | ||||
|         metrics.listDatabases.increment(); | ||||
|   | ||||
| @@ -0,0 +1,65 @@ | ||||
| package com.telecominfraproject.wlan.opensync.util; | ||||
|  | ||||
| import java.lang.reflect.Field; | ||||
|  | ||||
| import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbClientWithMetrics; | ||||
| import com.vmware.ovsdb.service.OvsdbClient; | ||||
| import com.vmware.ovsdb.service.impl.OvsdbClientImpl; | ||||
|  | ||||
| import io.netty.channel.Channel; | ||||
| import io.netty.util.AttributeKey; | ||||
|  | ||||
| /** | ||||
|  * Uses reflection to associate routingId with low-level netty channel. | ||||
|  *  | ||||
|  * @author dtop | ||||
|  *  | ||||
|  */ | ||||
| public class OvsdbClientUtil { | ||||
|      | ||||
|     public static final AttributeKey<String> routingRecordIdAttrKey = AttributeKey.valueOf("routingRecordId"); | ||||
|  | ||||
|  | ||||
|     public static void setRoutingId(OvsdbClient ovsdbClient, long routingId) { | ||||
|         Channel channel = getChannel(ovsdbClient); | ||||
|         channel.attr(routingRecordIdAttrKey).set(Long.toString(routingId)); | ||||
|     } | ||||
|  | ||||
|     public static Long getRoutingId(OvsdbClient ovsdbClient) { | ||||
|         Channel channel = getChannel(ovsdbClient); | ||||
|         String strVal = channel.attr(routingRecordIdAttrKey).get(); | ||||
|         return strVal==null?null:Long.parseLong(strVal);         | ||||
|     } | ||||
|  | ||||
|      | ||||
|     private static Channel getChannel(OvsdbClient ovsdbClient) { | ||||
|         if(ovsdbClient instanceof OvsdbClientWithMetrics) { | ||||
|             //unwrap the object, if needed | ||||
|             ovsdbClient = ((OvsdbClientWithMetrics) ovsdbClient).getDelegate(); | ||||
|         } | ||||
|          | ||||
|         if(! (ovsdbClient instanceof OvsdbClientImpl)) { | ||||
|             throw new RuntimeException("Do not know how to handle "+ ovsdbClient.getClass().getName()+" - expected OvsdbClientImpl"); | ||||
|         } | ||||
|          | ||||
|         try { | ||||
|             Field jsonRpcClientField = ovsdbClient.getClass().getDeclaredField("jsonRpcClient"); | ||||
|             jsonRpcClientField.setAccessible(true); | ||||
|             Object jsonRpcClientObj = jsonRpcClientField.get(ovsdbClient); | ||||
|              | ||||
|             Field transporterField = jsonRpcClientObj.getClass().getDeclaredField("transporter"); | ||||
|             transporterField.setAccessible(true); | ||||
|             Object transporterObj = transporterField.get(jsonRpcClientObj); | ||||
|              | ||||
|             Field channelField = transporterObj.getClass().getDeclaredField("val$channel"); | ||||
|             channelField.setAccessible(true); | ||||
|              | ||||
|             Channel channel = (Channel) channelField.get(transporterObj); | ||||
|              | ||||
|             return channel; | ||||
|         }catch(Exception e) { | ||||
|             throw new RuntimeException("Cannot get the channel for the ovsdbClient "+ ovsdbClient, e);             | ||||
|         } | ||||
|     } | ||||
|      | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 ralphlee
					ralphlee