From 6b7dcd4878ae41e0e1e0da31ce2a3c8de5a341b6 Mon Sep 17 00:00:00 2001 From: Thomas-Leung2021 Date: Tue, 14 Sep 2021 18:02:13 -0400 Subject: [PATCH 1/6] [NETEXP-2994] add checks for null certificate --- .../opensync/ovsdb/TipWlanOvsdbClient.java | 29 ++++++++++++------- .../ovsdb/TipWlanOvsdbRedirector.java | 23 +++++++++++---- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/ovsdb/TipWlanOvsdbClient.java b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/ovsdb/TipWlanOvsdbClient.java index 8fa2a08..14c72bf 100644 --- a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/ovsdb/TipWlanOvsdbClient.java +++ b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/ovsdb/TipWlanOvsdbClient.java @@ -6,7 +6,6 @@ import com.netflix.servo.DefaultMonitorRegistry; import com.netflix.servo.monitor.BasicCounter; import com.netflix.servo.monitor.Counter; import com.netflix.servo.monitor.MonitorConfig; -import com.netflix.servo.monitor.Monitors; import com.netflix.servo.tag.TagList; import com.telecominfraproject.wlan.cloudmetrics.CloudMetricsTags; import com.telecominfraproject.wlan.core.model.equipment.MacAddress; @@ -19,7 +18,6 @@ import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSession; import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSessionMapInterface; import com.telecominfraproject.wlan.opensync.external.integration.models.*; import com.telecominfraproject.wlan.opensync.ovsdb.dao.OvsdbDao; -import com.telecominfraproject.wlan.opensync.ovsdb.dao.OvsdbMonitor; import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbClientWithMetrics; import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbMetrics; import com.telecominfraproject.wlan.opensync.util.OvsdbStringConstants; @@ -30,6 +28,7 @@ import com.vmware.ovsdb.protocol.methods.*; import com.vmware.ovsdb.protocol.operation.notation.Row; import com.vmware.ovsdb.protocol.operation.notation.Value; import com.vmware.ovsdb.service.OvsdbClient; +import com.vmware.ovsdb.service.OvsdbConnectionInfo; import com.vmware.ovsdb.service.OvsdbPassiveConnectionListener; import io.netty.handler.ssl.SslContext; import org.slf4j.Logger; @@ -39,6 +38,7 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -114,18 +114,25 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface { @Override public void connected(OvsdbClient ovsdbClient) { - + connectionsAttempted.increment(); - + if (!(ovsdbClient instanceof OvsdbClientWithMetrics)) { ovsdbClient = new OvsdbClientWithMetrics(ovsdbClient, ovsdbMetrics); } - - String remoteHost = ovsdbClient.getConnectionInfo().getRemoteAddress().getHostAddress(); - int localPort = ovsdbClient.getConnectionInfo().getLocalPort(); + String subjectDn; try { - subjectDn = ((X509Certificate) ovsdbClient.getConnectionInfo().getRemoteCertificate()).getSubjectDN().getName(); + OvsdbConnectionInfo connectionInfo = ovsdbClient.getConnectionInfo(); + String remoteHost = connectionInfo.getRemoteAddress().getHostAddress(); + Certificate remoteCertificate = connectionInfo.getRemoteCertificate(); + if (remoteCertificate == null) { + LOG.debug("Connect attempt no certificate from {} on remote port {}", remoteHost, connectionInfo.getRemotePort()); + return; + } + + int localPort = connectionInfo.getLocalPort(); + subjectDn = ((X509Certificate) remoteCertificate).getSubjectDN().getName(); String clientCn = SslUtil.extractCN(subjectDn); LOG.info("ovsdbClient connecting from {} on port {} clientCn {}", remoteHost, localPort, clientCn); @@ -186,7 +193,9 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface { localPort = ovsdbClient.getConnectionInfo().getLocalPort(); String subjectDn = null; try { - subjectDn = ((X509Certificate) ovsdbClient.getConnectionInfo().getRemoteCertificate()).getSubjectDN().getName(); + Certificate remoteCertificate = ovsdbClient.getConnectionInfo().getRemoteCertificate(); + if (remoteCertificate != null) + subjectDn = ((X509Certificate) remoteCertificate).getSubjectDN().getName(); } catch (Exception e) { // do nothing } @@ -197,7 +206,7 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface { extIntegrationInterface.apDisconnected(key); ovsdbSessionMapInterface.removeSession(key); } catch (Exception e) { - LOG.debug("Unable to process ap disconnect. {}", e.getMessage()); + LOG.debug("Unable to process ap disconnect. {}", e); } } LOG.info("ovsdbClient disconnected from {} on port {} clientCn {} AP {} ", remoteHost, localPort, clientCn, key); diff --git a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/ovsdb/TipWlanOvsdbRedirector.java b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/ovsdb/TipWlanOvsdbRedirector.java index 9266963..7a37310 100644 --- a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/ovsdb/TipWlanOvsdbRedirector.java +++ b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/ovsdb/TipWlanOvsdbRedirector.java @@ -1,5 +1,6 @@ package com.telecominfraproject.wlan.opensync.ovsdb; +import java.security.cert.Certificate; import java.security.cert.X509Certificate; import javax.annotation.PostConstruct; @@ -14,13 +15,13 @@ import com.netflix.servo.DefaultMonitorRegistry; import com.netflix.servo.monitor.BasicCounter; import com.netflix.servo.monitor.Counter; import com.netflix.servo.monitor.MonitorConfig; -import com.netflix.servo.monitor.Monitors; import com.netflix.servo.tag.TagList; import com.telecominfraproject.wlan.cloudmetrics.CloudMetricsTags; import com.telecominfraproject.wlan.opensync.ovsdb.dao.OvsdbDao; import com.telecominfraproject.wlan.opensync.util.SslUtil; import com.vmware.ovsdb.callback.ConnectionCallback; import com.vmware.ovsdb.service.OvsdbClient; +import com.vmware.ovsdb.service.OvsdbConnectionInfo; import com.vmware.ovsdb.service.OvsdbPassiveConnectionListener; import io.netty.handler.ssl.SslContext; @@ -77,11 +78,19 @@ public class TipWlanOvsdbRedirector { ConnectionCallback connectionCallback = new ConnectionCallback() { public void connected(OvsdbClient ovsdbClient) { connectionsAttempted.increment(); - String remoteHost = ovsdbClient.getConnectionInfo().getRemoteAddress().getHostAddress(); - int localPort = ovsdbClient.getConnectionInfo().getLocalPort(); - String subjectDn = null; + try { - subjectDn = ((X509Certificate) ovsdbClient.getConnectionInfo().getRemoteCertificate()).getSubjectDN().getName(); + OvsdbConnectionInfo connectionInfo = ovsdbClient.getConnectionInfo(); + String remoteHost = connectionInfo.getRemoteAddress().getHostAddress(); + Certificate remoteCertificate = connectionInfo.getRemoteCertificate(); + if (remoteCertificate == null) { + LOG.debug("Connect attempt no certificate from {} on remote port {}", remoteHost, connectionInfo.getRemotePort()); + return; + } + + int localPort = connectionInfo.getLocalPort(); + String subjectDn = null; + subjectDn = ((X509Certificate) remoteCertificate).getSubjectDN().getName(); String clientCn = SslUtil.extractCN(subjectDn); LOG.info("ovsdbClient redirector connected from {} on port {} clientCn {}", remoteHost, localPort, clientCn); @@ -101,7 +110,9 @@ public class TipWlanOvsdbRedirector { int localPort = ovsdbClient.getConnectionInfo().getLocalPort(); String subjectDn = null; try { - subjectDn = ((X509Certificate) ovsdbClient.getConnectionInfo().getRemoteCertificate()).getSubjectDN().getName(); + Certificate remoteCertificate = ovsdbClient.getConnectionInfo().getRemoteCertificate(); + if (remoteCertificate != null) + subjectDn = ((X509Certificate) remoteCertificate).getSubjectDN().getName(); } catch (Exception e) { //do nothing } From ec617cb47f5b24e9b929469c0b3e1f0b06b94bf2 Mon Sep 17 00:00:00 2001 From: Thomas-Leung2021 Date: Thu, 9 Sep 2021 16:03:45 -0400 Subject: [PATCH 2/6] [NETEXP-2957] remove rates unused calls --- .../external/integration/utils/MqttStatsPublisher.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index fd303cd..1a4c929 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -1163,12 +1163,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { if (cl.getStats().hasTxRate()) { cMetrics.setAverageTxRate(cl.getStats().getTxRate() / 1000); } - - if (cl.getStats().hasTxRate() && cl.getStats().hasRxRate()) { - cMetrics.setRates(new int[] {Double.valueOf(cl.getStats().getTxRate() / 1000).intValue(), - Double.valueOf(cl.getStats().getRxRate() / 1000).intValue()}); - } - + if (cl.getStats().hasTxErrors()) { cMetrics.setNumTxDropped((int) cl.getStats().getTxErrors()); } From a5206909769915db8fa68ead27652bcca515e378 Mon Sep 17 00:00:00 2001 From: Thomas-Leung2021 Date: Tue, 31 Aug 2021 17:32:44 -0400 Subject: [PATCH 3/6] [NETEXP-2801] Make Stats and Events publishing asynchronous --- .../utils/AsyncPublishService.java | 365 ++++++++++++++++++ .../integration/utils/MqttStatsPublisher.java | 327 +--------------- .../opensync/mqtt/OpensyncMqttClient.java | 2 +- 3 files changed, 374 insertions(+), 320 deletions(-) create mode 100644 opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/AsyncPublishService.java diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/AsyncPublishService.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/AsyncPublishService.java new file mode 100644 index 0000000..2b9ae28 --- /dev/null +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/AsyncPublishService.java @@ -0,0 +1,365 @@ +package com.telecominfraproject.wlan.opensync.external.integration.utils; + +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import com.google.common.base.Objects; +import com.google.protobuf.ByteString; +import com.telecominfraproject.wlan.client.ClientServiceInterface; +import com.telecominfraproject.wlan.client.info.models.ClientInfoDetails; +import com.telecominfraproject.wlan.client.session.models.AssociationState; +import com.telecominfraproject.wlan.client.session.models.ClientDhcpDetails; +import com.telecominfraproject.wlan.client.session.models.ClientSession; +import com.telecominfraproject.wlan.client.session.models.ClientSessionDetails; +import com.telecominfraproject.wlan.cloudeventdispatcher.CloudEventDispatcherInterface; +import com.telecominfraproject.wlan.core.model.equipment.MacAddress; +import com.telecominfraproject.wlan.opensync.util.OvsdbToWlanCloudTypeMappingUtility; +import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric; + +import sts.OpensyncStats.AssocType; +import sts.OpensyncStats.DeviceType; +import sts.OpensyncStats.EventReport; +import sts.OpensyncStats.FrameType; +import sts.OpensyncStats.Report; +import sts.OpensyncStats.EventReport.ClientAssocEvent; +import sts.OpensyncStats.EventReport.ClientAuthEvent; +import sts.OpensyncStats.EventReport.ClientDisconnectEvent; +import sts.OpensyncStats.EventReport.ClientIpEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/*** + * MqttStatsPublisher will use this class to publish Stats and Events asynchronously. + * We use async to decrease the processing time for the gateway. + * + * Note: @Async only applies on public method and cannot have self-invocation (i.e. cannot + * calling the async method from within the same class) + */ +@Service +public class AsyncPublishService { + + private static final Logger LOG = LoggerFactory.getLogger(MqttStatsPublisher.class); + + @Autowired + private CloudEventDispatcherInterface cloudEventDispatcherInterface; + + @Autowired + private RealtimeEventPublisher realtimeEventPublisher; + + @Autowired + private ClientServiceInterface clientServiceInterface; + + + @Async + public void asyncPublishStats(String apId, List metricRecordList) { + try { + long publishStart = System.nanoTime(); + cloudEventDispatcherInterface.publishMetrics(metricRecordList); + long publishStop = System.nanoTime(); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed publishing time for metrics records from AP {} is {} milliseconds", apId, + TimeUnit.MILLISECONDS.convert(publishStop - publishStart, TimeUnit.NANOSECONDS)); + } catch (Exception e) { + LOG.error("Exception when trying to publishServiceMetrics.", e); + } + } + + @Async + public void asyncPublishEvents(Report report, int customerId, long equipmentId, String apId, long locationId) { + try { + long mqttEventsStart = System.nanoTime(); + publishEvents(report, customerId, equipmentId, apId, locationId); + long mqttEventsStop = System.nanoTime(); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed publishing time for mqtt events from AP {} is {} milliseconds", apId, + TimeUnit.MILLISECONDS.convert(mqttEventsStop - mqttEventsStart, TimeUnit.NANOSECONDS)); + } catch (Exception e) { + LOG.error("Exception when trying to publishEvents.", e); + } + } + + void publishEvents(Report report, int customerId, long equipmentId, String apId, long locationId) { + + realtimeEventPublisher.publishSipCallEvents(customerId, equipmentId, locationId, report.getVideoVoiceReportList()); + + for (EventReport eventReport : report.getEventReportList()) { + + for (sts.OpensyncStats.EventReport.ClientSession apEventClientSession : eventReport.getClientSessionList()) { + + LOG.debug("Processing EventReport::ClientSession for AP {}", apId); + // for the following MQTT events, the client/client session is first updated, then the real time event + // is published. + if (apEventClientSession.hasClientAuthEvent()) { + processClientAuthEvent(customerId, equipmentId, locationId, apEventClientSession); + } + if (apEventClientSession.hasClientAssocEvent()) { + processClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession); + } + if (apEventClientSession.hasClientIpEvent()) { + processClientIpEvent(customerId, equipmentId, locationId, apEventClientSession); + } + if (apEventClientSession.hasClientDisconnectEvent()) { + processClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession); + } + } + realtimeEventPublisher.publishChannelHopEvents(customerId, equipmentId, locationId, eventReport); + } + + } + + private void processClientAuthEvent(int customerId, long equipmentId, long locationId, sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { + ClientAuthEvent apClientEvent = apEventClientSession.getClientAuthEvent(); + com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac())); + + if (client == null) { + client = new com.telecominfraproject.wlan.client.models.Client(); + client.setCustomerId(customerId); + client.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); + client.setDetails(new ClientInfoDetails()); + client = clientServiceInterface.create(client); + } + + ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(apClientEvent.getStaMac())); + + if (clientSession == null) { + clientSession = new ClientSession(); + clientSession.setCustomerId(customerId); + clientSession.setEquipmentId(equipmentId); + clientSession.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); + clientSession.setLocationId(locationId); + clientSession.setDetails(new ClientSessionDetails()); + clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId()))); + } + if (clientSession.getDetails().getPriorEquipmentId() == null) { + clientSession.getDetails().setPriorEquipmentId(clientSession.getEquipmentId()); + } + if (clientSession.getDetails().getPriorSessionId() == null) { + if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) + clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); + } + if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) { + clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); + } + clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId())); + clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand())); + clientSession.getDetails().setSsid(apClientEvent.getSsid()); + if (apClientEvent.hasAuthStatus()) { + clientSession.getDetails().setAssociationStatus(apClientEvent.getAuthStatus()); + } + clientSession.getDetails().setAuthTimestamp(apClientEvent.getTimestampMs()); + clientSession.getDetails().setAssociationState(AssociationState._802_11_Authenticated); + clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); + clientSession = clientServiceInterface.updateSession(clientSession); + + realtimeEventPublisher.publishClientAuthSystemEvent(customerId, equipmentId, locationId, apEventClientSession.getClientAuthEvent()); + + } + + private void processClientAssocEvent(int customerId, long equipmentId, long locationId, + sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { + ClientAssocEvent apClientEvent = apEventClientSession.getClientAssocEvent(); + + com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac())); + + if (client == null) { + client = new com.telecominfraproject.wlan.client.models.Client(); + client.setCustomerId(customerId); + client.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); + client.setDetails(new ClientInfoDetails()); + client = clientServiceInterface.create(client); + } + + ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(apClientEvent.getStaMac())); + + if (clientSession == null) { + clientSession = new ClientSession(); + clientSession.setCustomerId(customerId); + clientSession.setEquipmentId(equipmentId); + clientSession.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); + clientSession.setLocationId(locationId); + clientSession.setDetails(new ClientSessionDetails()); + clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId()))); + } + if (clientSession.getDetails().getPriorEquipmentId() == null) { + clientSession.getDetails().setPriorEquipmentId(clientSession.getEquipmentId()); + } + if (clientSession.getDetails().getPriorSessionId() == null) { + if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) + clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); + } + if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) { + clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); + } + clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId())); + clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand())); + clientSession.getDetails().setSsid(apClientEvent.getSsid()); + if (apClientEvent.hasStatus()) { + clientSession.getDetails().setAssociationStatus(apClientEvent.getStatus()); + } + clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand())); + if (apClientEvent.hasUsing11K()) { + clientSession.getDetails().setIs11KUsed(apClientEvent.getUsing11K()); + } + if (apClientEvent.hasUsing11R()) { + clientSession.getDetails().setIs11RUsed(apClientEvent.getUsing11R()); + } + if (apClientEvent.hasUsing11V()) { + clientSession.getDetails().setIs11VUsed(apClientEvent.getUsing11V()); + } + if (apClientEvent.hasAssocType()) { + clientSession.getDetails().setIsReassociation(apClientEvent.getAssocType().equals(AssocType.REASSOC)); + } + if (apClientEvent.hasRssi()) { + clientSession.getDetails().setAssocRssi(apClientEvent.getRssi()); + } + if (apClientEvent.hasInternalSc()) { + clientSession.getDetails().setAssocInternalSC(apClientEvent.getInternalSc()); + } + clientSession.getDetails().setAssocTimestamp(apClientEvent.getTimestampMs()); + clientSession.getDetails().setAssociationState(AssociationState._802_11_Associated); + clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); + clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession.getClientAssocEvent()); + + } + + private void processClientIpEvent(int customerId, long equipmentId, long locationId, sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { + ClientIpEvent apClientEvent = apEventClientSession.getClientIpEvent(); + com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac())); + if (client == null) { + client = new com.telecominfraproject.wlan.client.models.Client(); + client.setCustomerId(customerId); + client.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); + client.setDetails(new ClientInfoDetails()); + client = clientServiceInterface.create(client); + } + client = clientServiceInterface.update(client); + + ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(apClientEvent.getStaMac())); + if (clientSession == null) { + clientSession = new ClientSession(); + clientSession.setCustomerId(customerId); + clientSession.setEquipmentId(equipmentId); + clientSession.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); + clientSession.setLocationId(locationId); + clientSession.setDetails(new ClientSessionDetails()); + clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId()))); + } + if (clientSession.getDetails().getPriorEquipmentId() == null) { + clientSession.getDetails().setPriorEquipmentId(clientSession.getEquipmentId()); + } + if (clientSession.getDetails().getPriorSessionId() == null) { + if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) + clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); + } + if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) { + clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); + } + clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId())); + if (apClientEvent.hasIpAddr()) { + ByteString ipAddress = apClientEvent.getIpAddr(); + if (ipAddress != null) { + try { + InetAddress inetAddress = InetAddress.getByAddress(ipAddress.toByteArray()); + if (inetAddress instanceof Inet4Address) { + clientSession.getDetails().setIpAddress(inetAddress); + } else if (inetAddress instanceof Inet6Address) { + clientSession.getDetails().setIpAddress(inetAddress); + } else { + LOG.error("Invalid IP Address {}", ipAddress); + } + clientSession.getDetails().setIpTimestamp(apClientEvent.getTimestampMs()); + + } catch (UnknownHostException ex) { + } + } + } + clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); + clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientIpEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIpEvent()); + } + + private void processClientDisconnectEvent(int customerId, long equipmentId, long locationId, + sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { + ClientDisconnectEvent apClientEvent = apEventClientSession.getClientDisconnectEvent(); + com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac())); + if (client == null) { + client = new com.telecominfraproject.wlan.client.models.Client(); + client.setCustomerId(customerId); + client.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); + client.setDetails(new ClientInfoDetails()); + client = clientServiceInterface.create(client); + } + ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(apClientEvent.getStaMac())); + if (clientSession == null) { + clientSession = new ClientSession(); + clientSession.setCustomerId(customerId); + clientSession.setEquipmentId(equipmentId); + clientSession.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); + clientSession.setLocationId(locationId); + clientSession.setDetails(new ClientSessionDetails()); + clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId()))); + } + + if (clientSession.getDetails().getPriorSessionId() == null) { + if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) + clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); + } + if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) { + clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); + } + clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId())); + clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand())); + clientSession.getDetails().setSsid(apClientEvent.getSsid()); + if (apClientEvent.hasDevType()) { + if (apClientEvent.getDevType().equals(DeviceType.DEV_AP)) { + clientSession.getDetails().setDisconnectByApTimestamp(apClientEvent.getTimestampMs()); + if (apClientEvent.hasInternalRc()) { + clientSession.getDetails().setDisconnectByApInternalReasonCode(apClientEvent.getInternalRc()); + } + if (apClientEvent.hasReason()) { + clientSession.getDetails().setDisconnectByApReasonCode(apClientEvent.getReason()); + } + } else { + clientSession.getDetails().setDisconnectByClientTimestamp(apClientEvent.getTimestampMs()); + if (apClientEvent.hasInternalRc()) { + clientSession.getDetails().setDisconnectByClientInternalReasonCode(apClientEvent.getInternalRc()); + } + if (apClientEvent.hasReason()) { + clientSession.getDetails().setDisconnectByClientReasonCode(apClientEvent.getReason()); + } + } + } + if (apClientEvent.hasFrType()) { + if (apClientEvent.getFrType().equals(FrameType.FT_DEAUTH)) { + } + if (apClientEvent.getFrType().equals(FrameType.FT_DISASSOC)) { + } + } + if (apClientEvent.hasRssi()) { + clientSession.getDetails().setAssocRssi(apClientEvent.getRssi()); + } + if (apClientEvent.hasLrcvUpTsInUs()) { + clientSession.getDetails().setLastRxTimestamp(apClientEvent.getLrcvUpTsInUs()); + } + if (apClientEvent.hasLsentUpTsInUs()) { + clientSession.getDetails().setLastTxTimestamp(apClientEvent.getLsentUpTsInUs()); + } + clientSession.getDetails().setAssociationState(AssociationState.Disconnected); + clientSession.getDetails().setAssocTimestamp(apClientEvent.getTimestampMs()); + clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); + clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession.getClientDisconnectEvent()); + + } + +} diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index 1a4c929..a93aa42 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -1,8 +1,7 @@ + package com.telecominfraproject.wlan.opensync.external.integration.utils; -import java.net.Inet4Address; -import java.net.Inet6Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -22,20 +21,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import com.google.common.base.Objects; -import com.google.protobuf.ByteString; import com.telecominfraproject.wlan.alarm.AlarmServiceInterface; import com.telecominfraproject.wlan.alarm.models.Alarm; import com.telecominfraproject.wlan.alarm.models.AlarmCode; import com.telecominfraproject.wlan.alarm.models.AlarmDetails; import com.telecominfraproject.wlan.alarm.models.AlarmScopeType; import com.telecominfraproject.wlan.alarm.models.OriginatorType; -import com.telecominfraproject.wlan.client.ClientServiceInterface; -import com.telecominfraproject.wlan.client.info.models.ClientInfoDetails; -import com.telecominfraproject.wlan.client.session.models.AssociationState; -import com.telecominfraproject.wlan.client.session.models.ClientDhcpDetails; -import com.telecominfraproject.wlan.client.session.models.ClientSession; -import com.telecominfraproject.wlan.client.session.models.ClientSessionDetails; import com.telecominfraproject.wlan.cloudeventdispatcher.CloudEventDispatcherInterface; import com.telecominfraproject.wlan.core.model.equipment.ChannelBandwidth; import com.telecominfraproject.wlan.core.model.equipment.DetectedAuthMode; @@ -85,19 +76,11 @@ import com.telecominfraproject.wlan.status.models.StatusDataType; import com.telecominfraproject.wlan.status.network.models.NetworkAdminStatusData; import com.telecominfraproject.wlan.systemevent.models.SystemEvent; -import sts.OpensyncStats.AssocType; import sts.OpensyncStats.Client; import sts.OpensyncStats.ClientReport; import sts.OpensyncStats.DNSProbeMetric; import sts.OpensyncStats.Device; import sts.OpensyncStats.Device.RadioTemp; -import sts.OpensyncStats.DeviceType; -import sts.OpensyncStats.EventReport; -import sts.OpensyncStats.EventReport.ClientAssocEvent; -import sts.OpensyncStats.EventReport.ClientAuthEvent; -import sts.OpensyncStats.EventReport.ClientDisconnectEvent; -import sts.OpensyncStats.EventReport.ClientIpEvent; -import sts.OpensyncStats.FrameType; import sts.OpensyncStats.Neighbor; import sts.OpensyncStats.Neighbor.NeighborBss; import sts.OpensyncStats.NetworkProbe; @@ -124,13 +107,11 @@ public class MqttStatsPublisher implements StatsPublisherInterface { @Autowired private StatusServiceInterface statusServiceInterface; @Autowired - private ClientServiceInterface clientServiceInterface; - @Autowired private CloudEventDispatcherInterface cloudEventDispatcherInterface; @Autowired - private RealtimeEventPublisher realtimeEventPublisher; - @Autowired private AlarmServiceInterface alarmServiceInterface; + @Autowired + private AsyncPublishService asyncPublishService; @Value("${tip.wlan.mqttStatsPublisher.temperatureThresholdInC:80}") private int temperatureThresholdInC; @@ -148,7 +129,6 @@ public class MqttStatsPublisher implements StatsPublisherInterface { public int statsTimeDriftThresholdSec; @Override - public void processMqttMessage(String topic, Report report) { // Numerous try/catch blocks to address situations where logs are not being reported due to corrupt or invalid // data in mqtt stats causing a crash @@ -256,28 +236,12 @@ public class MqttStatsPublisher implements StatsPublisherInterface { }); - try { - long publishStart = System.nanoTime(); - cloudEventDispatcherInterface.publishMetrics(metricRecordList); - long publishStop = System.nanoTime(); - if (LOG.isDebugEnabled()) - LOG.debug("Elapsed publishing time for metrics records from AP {} is {} milliseconds", apId, - TimeUnit.MILLISECONDS.convert(publishStop - publishStart, TimeUnit.NANOSECONDS)); - } catch (Exception e) { - LOG.error("Exception when trying to publishServiceMetrics.", e); - } + // Make it asynchronous to decrease processing time + asyncPublishService.asyncPublishStats(apId, metricRecordList); } - try { - long mqttEventsStart = System.nanoTime(); - publishEvents(report, customerId, equipmentId, apId, locationId); - long mqttEventsStop = System.nanoTime(); - if (LOG.isDebugEnabled()) - LOG.debug("Elapsed publishing time for mqtt events from AP {} is {} milliseconds", apId, - TimeUnit.MILLISECONDS.convert(mqttEventsStop - mqttEventsStart, TimeUnit.NANOSECONDS)); - } catch (Exception e) { - LOG.error("Exception when trying to publishEvents.", e); - } + // Make it asynchronous to decrease processing time + asyncPublishService.asyncPublishEvents(report, customerId, equipmentId, apId, locationId); try { long endTime = System.nanoTime(); @@ -302,288 +266,13 @@ public class MqttStatsPublisher implements StatsPublisherInterface { LOG.info("processMqttMessage for {} complete", topic); } - + @Override public void publishSystemEventFromTableStateMonitor(SystemEvent event) { LOG.info("Publishing SystemEvent received by TableStateMonitor {}", event); cloudEventDispatcherInterface.publishEvent(event); } - void publishEvents(Report report, int customerId, long equipmentId, String apId, long locationId) { - - realtimeEventPublisher.publishSipCallEvents(customerId, equipmentId, locationId, report.getVideoVoiceReportList()); - - for (EventReport eventReport : report.getEventReportList()) { - - for (sts.OpensyncStats.EventReport.ClientSession apEventClientSession : eventReport.getClientSessionList()) { - - LOG.debug("Processing EventReport::ClientSession for AP {}", apId); - // for the following MQTT events, the client/client session is first updated, then the real time event - // is published. - if (apEventClientSession.hasClientAuthEvent()) { - processClientAuthEvent(customerId, equipmentId, locationId, apEventClientSession); - } - if (apEventClientSession.hasClientAssocEvent()) { - processClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession); - } - if (apEventClientSession.hasClientIpEvent()) { - processClientIpEvent(customerId, equipmentId, locationId, apEventClientSession); - } - if (apEventClientSession.hasClientDisconnectEvent()) { - processClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession); - } - } - realtimeEventPublisher.publishChannelHopEvents(customerId, equipmentId, locationId, eventReport); - } - - } - - protected void processClientDisconnectEvent(int customerId, long equipmentId, long locationId, - sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { - ClientDisconnectEvent apClientEvent = apEventClientSession.getClientDisconnectEvent(); - com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac())); - if (client == null) { - client = new com.telecominfraproject.wlan.client.models.Client(); - client.setCustomerId(customerId); - client.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); - client.setDetails(new ClientInfoDetails()); - client = clientServiceInterface.create(client); - } - ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(apClientEvent.getStaMac())); - if (clientSession == null) { - clientSession = new ClientSession(); - clientSession.setCustomerId(customerId); - clientSession.setEquipmentId(equipmentId); - clientSession.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); - clientSession.setLocationId(locationId); - clientSession.setDetails(new ClientSessionDetails()); - clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId()))); - } - - if (clientSession.getDetails().getPriorSessionId() == null) { - if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) - clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); - } - if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) { - clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); - } - clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId())); - clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand())); - clientSession.getDetails().setSsid(apClientEvent.getSsid()); - if (apClientEvent.hasDevType()) { - if (apClientEvent.getDevType().equals(DeviceType.DEV_AP)) { - clientSession.getDetails().setDisconnectByApTimestamp(apClientEvent.getTimestampMs()); - if (apClientEvent.hasInternalRc()) { - clientSession.getDetails().setDisconnectByApInternalReasonCode(apClientEvent.getInternalRc()); - } - if (apClientEvent.hasReason()) { - clientSession.getDetails().setDisconnectByApReasonCode(apClientEvent.getReason()); - } - } else { - clientSession.getDetails().setDisconnectByClientTimestamp(apClientEvent.getTimestampMs()); - if (apClientEvent.hasInternalRc()) { - clientSession.getDetails().setDisconnectByClientInternalReasonCode(apClientEvent.getInternalRc()); - } - if (apClientEvent.hasReason()) { - clientSession.getDetails().setDisconnectByClientReasonCode(apClientEvent.getReason()); - } - } - } - if (apClientEvent.hasFrType()) { - if (apClientEvent.getFrType().equals(FrameType.FT_DEAUTH)) { - } - if (apClientEvent.getFrType().equals(FrameType.FT_DISASSOC)) { - } - } - if (apClientEvent.hasRssi()) { - clientSession.getDetails().setAssocRssi(apClientEvent.getRssi()); - } - if (apClientEvent.hasLrcvUpTsInUs()) { - clientSession.getDetails().setLastRxTimestamp(apClientEvent.getLrcvUpTsInUs()); - } - if (apClientEvent.hasLsentUpTsInUs()) { - clientSession.getDetails().setLastTxTimestamp(apClientEvent.getLsentUpTsInUs()); - } - clientSession.getDetails().setAssociationState(AssociationState.Disconnected); - clientSession.getDetails().setAssocTimestamp(apClientEvent.getTimestampMs()); - clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); - clientSession = clientServiceInterface.updateSession(clientSession); - realtimeEventPublisher.publishClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession.getClientDisconnectEvent()); - - } - - protected void processClientAuthEvent(int customerId, long equipmentId, long locationId, sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { - ClientAuthEvent apClientEvent = apEventClientSession.getClientAuthEvent(); - com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac())); - - if (client == null) { - client = new com.telecominfraproject.wlan.client.models.Client(); - client.setCustomerId(customerId); - client.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); - client.setDetails(new ClientInfoDetails()); - client = clientServiceInterface.create(client); - } - - ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(apClientEvent.getStaMac())); - - if (clientSession == null) { - clientSession = new ClientSession(); - clientSession.setCustomerId(customerId); - clientSession.setEquipmentId(equipmentId); - clientSession.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); - clientSession.setLocationId(locationId); - clientSession.setDetails(new ClientSessionDetails()); - clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId()))); - } - if (clientSession.getDetails().getPriorEquipmentId() == null) { - clientSession.getDetails().setPriorEquipmentId(clientSession.getEquipmentId()); - } - if (clientSession.getDetails().getPriorSessionId() == null) { - if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) - clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); - } - if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) { - clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); - } - clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId())); - clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand())); - clientSession.getDetails().setSsid(apClientEvent.getSsid()); - if (apClientEvent.hasAuthStatus()) { - clientSession.getDetails().setAssociationStatus(apClientEvent.getAuthStatus()); - } - clientSession.getDetails().setAuthTimestamp(apClientEvent.getTimestampMs()); - clientSession.getDetails().setAssociationState(AssociationState._802_11_Authenticated); - clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); - clientSession = clientServiceInterface.updateSession(clientSession); - - realtimeEventPublisher.publishClientAuthSystemEvent(customerId, equipmentId, locationId, apEventClientSession.getClientAuthEvent()); - - } - - protected void processClientAssocEvent(int customerId, long equipmentId, long locationId, - sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { - ClientAssocEvent apClientEvent = apEventClientSession.getClientAssocEvent(); - - com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac())); - - if (client == null) { - client = new com.telecominfraproject.wlan.client.models.Client(); - client.setCustomerId(customerId); - client.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); - client.setDetails(new ClientInfoDetails()); - client = clientServiceInterface.create(client); - } - - ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(apClientEvent.getStaMac())); - - if (clientSession == null) { - clientSession = new ClientSession(); - clientSession.setCustomerId(customerId); - clientSession.setEquipmentId(equipmentId); - clientSession.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); - clientSession.setLocationId(locationId); - clientSession.setDetails(new ClientSessionDetails()); - clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId()))); - } - if (clientSession.getDetails().getPriorEquipmentId() == null) { - clientSession.getDetails().setPriorEquipmentId(clientSession.getEquipmentId()); - } - if (clientSession.getDetails().getPriorSessionId() == null) { - if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) - clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); - } - if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) { - clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); - } - clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId())); - clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand())); - clientSession.getDetails().setSsid(apClientEvent.getSsid()); - if (apClientEvent.hasStatus()) { - clientSession.getDetails().setAssociationStatus(apClientEvent.getStatus()); - } - clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand())); - if (apClientEvent.hasUsing11K()) { - clientSession.getDetails().setIs11KUsed(apClientEvent.getUsing11K()); - } - if (apClientEvent.hasUsing11R()) { - clientSession.getDetails().setIs11RUsed(apClientEvent.getUsing11R()); - } - if (apClientEvent.hasUsing11V()) { - clientSession.getDetails().setIs11VUsed(apClientEvent.getUsing11V()); - } - if (apClientEvent.hasAssocType()) { - clientSession.getDetails().setIsReassociation(apClientEvent.getAssocType().equals(AssocType.REASSOC)); - } - if (apClientEvent.hasRssi()) { - clientSession.getDetails().setAssocRssi(apClientEvent.getRssi()); - } - if (apClientEvent.hasInternalSc()) { - clientSession.getDetails().setAssocInternalSC(apClientEvent.getInternalSc()); - } - clientSession.getDetails().setAssocTimestamp(apClientEvent.getTimestampMs()); - clientSession.getDetails().setAssociationState(AssociationState._802_11_Associated); - clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); - clientSession = clientServiceInterface.updateSession(clientSession); - realtimeEventPublisher.publishClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession.getClientAssocEvent()); - - } - - protected void processClientIpEvent(int customerId, long equipmentId, long locationId, sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { - ClientIpEvent apClientEvent = apEventClientSession.getClientIpEvent(); - com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac())); - if (client == null) { - client = new com.telecominfraproject.wlan.client.models.Client(); - client.setCustomerId(customerId); - client.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); - client.setDetails(new ClientInfoDetails()); - client = clientServiceInterface.create(client); - } - client = clientServiceInterface.update(client); - - ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(apClientEvent.getStaMac())); - if (clientSession == null) { - clientSession = new ClientSession(); - clientSession.setCustomerId(customerId); - clientSession.setEquipmentId(equipmentId); - clientSession.setMacAddress(MacAddress.valueOf(apClientEvent.getStaMac())); - clientSession.setLocationId(locationId); - clientSession.setDetails(new ClientSessionDetails()); - clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId()))); - } - if (clientSession.getDetails().getPriorEquipmentId() == null) { - clientSession.getDetails().setPriorEquipmentId(clientSession.getEquipmentId()); - } - if (clientSession.getDetails().getPriorSessionId() == null) { - if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) - clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); - } - if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) { - clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId()); - } - clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId())); - if (apClientEvent.hasIpAddr()) { - ByteString ipAddress = apClientEvent.getIpAddr(); - if (ipAddress != null) { - try { - InetAddress inetAddress = InetAddress.getByAddress(ipAddress.toByteArray()); - if (inetAddress instanceof Inet4Address) { - clientSession.getDetails().setIpAddress(inetAddress); - } else if (inetAddress instanceof Inet6Address) { - clientSession.getDetails().setIpAddress(inetAddress); - } else { - LOG.error("Invalid IP Address {}", ipAddress); - } - clientSession.getDetails().setIpTimestamp(apClientEvent.getTimestampMs()); - - } catch (UnknownHostException ex) { - } - } - } - clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); - clientSession = clientServiceInterface.updateSession(clientSession); - realtimeEventPublisher.publishClientIpEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIpEvent()); - } - void populateApNodeMetrics(List metricRecordList, Report report, int customerId, long equipmentId, long locationId) { LOG.info("populateApNodeMetrics for Customer {} Equipment {}", customerId, equipmentId); ApNodeMetrics apNodeMetrics = new ApNodeMetrics(); diff --git a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java index fe111fa..0a3590d 100644 --- a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java +++ b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java @@ -195,7 +195,7 @@ public class OpensyncMqttClient implements ApplicationListener Date: Thu, 2 Sep 2021 11:45:34 -0400 Subject: [PATCH 4/6] [NETEXP-2801] optimize alarm logic to increase ApNode metric performance --- .../integration/utils/MqttStatsPublisher.java | 52 ++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index a93aa42..62b597d 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -285,6 +285,8 @@ public class MqttStatsPublisher implements StatsPublisherInterface { for (Device deviceReport : report.getDeviceList()) { int avgRadioTemp = 0; + // The 3 fixed alarm codes + List alarms = alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(AlarmCode.CPUTemperature, AlarmCode.CPUUtilization, AlarmCode.MemoryUtilization)); ApPerformance apPerformance = new ApPerformance(); apNodeMetrics.setApPerformance(apPerformance); @@ -303,10 +305,10 @@ public class MqttStatsPublisher implements StatsPublisherInterface { apPerformance.setCpuTemperature(avgRadioTemp); } if (avgRadioTemp > temperatureThresholdInC) { - raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature, deviceReport.getTimestampMs()); + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature, deviceReport.getTimestampMs(), alarms); } else { // Clear any existing temperature alarms for this ap - clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature); + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature, alarms); } } @@ -314,10 +316,10 @@ public class MqttStatsPublisher implements StatsPublisherInterface { Integer cpuUtilization = deviceReport.getCpuUtil().getCpuUtil(); apPerformance.setCpuUtilized(new int[] {cpuUtilization}); if (cpuUtilization > cpuUtilThresholdPct) { - raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization, deviceReport.getTimestampMs()); + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization, deviceReport.getTimestampMs(), alarms); } else { // Clear any existing cpuUtilization alarms - clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization); + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization, alarms); } } @@ -329,10 +331,10 @@ public class MqttStatsPublisher implements StatsPublisherInterface { double usedMemory = deviceReport.getMemUtil().getMemUsed(); double totalMemory = deviceReport.getMemUtil().getMemTotal(); if (usedMemory / totalMemory * 100 > memoryUtilThresholdPct) { - raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization, deviceReport.getTimestampMs()); + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization, deviceReport.getTimestampMs(), alarms); } else { // Clear any existing cpuUtilization alarms - clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization); + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization, alarms); } } @@ -587,16 +589,23 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } - void clearDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode) { - alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode)).stream().forEach(a -> { - Alarm alarm = alarmServiceInterface.delete(customerId, equipmentId, a.getAlarmCode(), a.getCreatedTimestamp()); - LOG.debug("Cleared device threshold alarm {}", alarm); - }); + void clearDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode, List alarms) { + for (Alarm alarm: alarms) { + if (alarm.getAlarmCode() == alarmCode) { + Alarm removedAlarm = alarmServiceInterface.delete(customerId, equipmentId, alarm.getAlarmCode(), alarm.getCreatedTimestamp()); + LOG.debug("Cleared device threshold alarm {}", removedAlarm); + return; + } + } } - void raiseDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode, long timestampMs) { - // Raise an alarm for temperature + void raiseDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode, long timestampMs, List alarms) { + for (Alarm alarm: alarms) { + if (alarm.getAlarmCode() == alarmCode) { + return; + } + } Alarm alarm = new Alarm(); alarm.setCustomerId(customerId); alarm.setEquipmentId(equipmentId); @@ -610,11 +619,8 @@ public class MqttStatsPublisher implements StatsPublisherInterface { alarmDetails.setMessage(alarmCode.getDescription()); alarmDetails.setAffectedEquipmentIds(Collections.singletonList(equipmentId)); alarm.setDetails(alarmDetails); - List alarms = alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode)); - if (alarms.isEmpty()) { - alarm.setCreatedTimestamp(timestampMs); - alarm = alarmServiceInterface.create(alarm); - } + alarm.setCreatedTimestamp(timestampMs); + alarm = alarmServiceInterface.create(alarm); } private void checkIfOutOfBound(String checkedType, int checkedValue, Survey survey, int totalDurationMs, int busyTx, int busyRx, int busy, int busySelf) { @@ -694,15 +700,12 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } void updateDeviceStatusRadioUtilizationReport(int customerId, long equipmentId, RadioUtilizationReport radioUtilizationReport) { - LOG.info("Processing updateDeviceStatusRadioUtilizationReport for equipmentId {} with RadioUtilizationReport {}", equipmentId, radioUtilizationReport); - Status radioUtilizationStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.RADIO_UTILIZATION); - if (radioUtilizationStatus == null) { + LOG.info("REMOVED StatusServiceInterface GETORNULL - Processing updateDeviceStatusRadioUtilizationReport for equipmentId {} with RadioUtilizationReport {}", equipmentId, radioUtilizationReport); LOG.debug("Create new radioUtilizationStatus"); - radioUtilizationStatus = new Status(); + Status radioUtilizationStatus = new Status(); radioUtilizationStatus.setCustomerId(customerId); radioUtilizationStatus.setEquipmentId(equipmentId); radioUtilizationStatus.setStatusDataType(StatusDataType.RADIO_UTILIZATION); - } radioUtilizationStatus.setDetails(radioUtilizationReport); statusServiceInterface.update(radioUtilizationStatus); } @@ -865,7 +868,8 @@ public class MqttStatsPublisher implements StatsPublisherInterface { cMetrics.setNumTxDataRetries((int) cl.getStats().getTxRetries()); } - LOG.debug("ApClientMetrics Report {}", cMetrics); + // Commented to increase performance as it repetitive + // LOG.debug("ApClientMetrics Report {}", cMetrics); } From dfd7e133990fd15fd1277a92b2cb02de21e89d2c Mon Sep 17 00:00:00 2001 From: Thomas-Leung2021 Date: Thu, 2 Sep 2021 13:02:54 -0400 Subject: [PATCH 5/6] [NETEXP-2801] remove extra logs --- .../integration/utils/MqttStatsPublisher.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index 62b597d..ae8e686 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -700,12 +700,13 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } void updateDeviceStatusRadioUtilizationReport(int customerId, long equipmentId, RadioUtilizationReport radioUtilizationReport) { - LOG.info("REMOVED StatusServiceInterface GETORNULL - Processing updateDeviceStatusRadioUtilizationReport for equipmentId {} with RadioUtilizationReport {}", equipmentId, radioUtilizationReport); - LOG.debug("Create new radioUtilizationStatus"); - Status radioUtilizationStatus = new Status(); - radioUtilizationStatus.setCustomerId(customerId); - radioUtilizationStatus.setEquipmentId(equipmentId); - radioUtilizationStatus.setStatusDataType(StatusDataType.RADIO_UTILIZATION); + LOG.info("Processing updateDeviceStatusRadioUtilizationReport for equipmentId {} with RadioUtilizationReport {}", equipmentId, radioUtilizationReport); + // remove statusServiceInterface.getOrNull() for better performance (i.e. no createdTimestamp) + LOG.debug("Create new radioUtilizationStatus"); + Status radioUtilizationStatus = new Status(); + radioUtilizationStatus.setCustomerId(customerId); + radioUtilizationStatus.setEquipmentId(equipmentId); + radioUtilizationStatus.setStatusDataType(StatusDataType.RADIO_UTILIZATION); radioUtilizationStatus.setDetails(radioUtilizationReport); statusServiceInterface.update(radioUtilizationStatus); } From c5127e4d8e616905cc022c1acab197c27303d3d5 Mon Sep 17 00:00:00 2001 From: Thomas-Leung2021 Date: Wed, 8 Sep 2021 14:42:47 -0400 Subject: [PATCH 6/6] NETEXP-2801 optimise getBssidForClientSsid() to improve performance --- .../integration/utils/MqttStatsPublisher.java | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index ae8e686..ea3c77b 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -942,6 +942,9 @@ public class MqttStatsPublisher implements StatsPublisherInterface { void populateApSsidMetrics(List metricRecordList, Report report, int customerId, long equipmentId, String apId, long locationId) { LOG.debug("populateApSsidMetrics start"); + + Status activeBssidsStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.ACTIVE_BSSIDS); + LOG.debug("populateApSsidMetrics get BSSID from activeBssids {}", activeBssidsStatus); if (report.getClientsCount() == 0) { LOG.info("populateApSsidMetrics no client data present, cannot build {}", ServiceMetricDataType.ApSsid); @@ -984,7 +987,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { stats.setSourceTimestampMs(clientReport.getTimestampMs()); // Get the BSSID (MAC address) for this SSID - String bssid = getBssidForClientSsid(customerId, equipmentId, apId, e.getKey(), radioType); + String bssid = getBssidForClientSsid(apId, e.getKey(), radioType, activeBssidsStatus); if (bssid != null) stats.setBssid(MacAddress.valueOf(bssid)); else @@ -1031,26 +1034,20 @@ public class MqttStatsPublisher implements StatsPublisherInterface { LOG.debug("populateApSsidMetrics finished"); } - String getBssidForClientSsid(int customerId, long equipmentId, String apId, String ssid, RadioType radioType) { + String getBssidForClientSsid(String apId, String ssid, RadioType radioType, Status activeBssidsStatus) { try { - Status activeBssidsStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.ACTIVE_BSSIDS); - LOG.debug("populateApSsidMetrics get BSSID from activeBssids {}", activeBssidsStatus); - if (activeBssidsStatus != null) { - if (activeBssidsStatus.getDetails() != null) { - ActiveBSSIDs activeBssids = (ActiveBSSIDs) activeBssidsStatus.getDetails(); - if (activeBssids.getActiveBSSIDs() != null) { - for (ActiveBSSID activeBssid : activeBssids.getActiveBSSIDs()) { - if (activeBssid.getRadioType() != null && activeBssid.getRadioType().equals(radioType)) { - if (activeBssid.getSsid() != null && activeBssid.getSsid().equals(ssid)) { - if (activeBssid.getBssid() != null) { - return activeBssid.getBssid(); - } - } - } - } - } - } - } + ActiveBSSIDs activeBssids = (ActiveBSSIDs) activeBssidsStatus.getDetails(); + if (activeBssids.getActiveBSSIDs() != null) { + for (ActiveBSSID activeBssid : activeBssids.getActiveBSSIDs()) { + if (activeBssid.getRadioType() != null && activeBssid.getRadioType().equals(radioType)) { + if (activeBssid.getSsid() != null && activeBssid.getSsid().equals(ssid)) { + if (activeBssid.getBssid() != null) { + return activeBssid.getBssid(); + } + } + } + } + } } catch (Exception e) { LOG.error("Could not get active BSSIDs for apId {} radioType {}", apId, radioType, e); }