diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index fecfb9a..e4c4189 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -7,13 +7,14 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,7 @@ import com.telecominfraproject.wlan.servicemetric.channelinfo.models.ChannelInfo import com.telecominfraproject.wlan.servicemetric.channelinfo.models.ChannelInfoReports; import com.telecominfraproject.wlan.servicemetric.client.models.ClientMetrics; import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric; +import com.telecominfraproject.wlan.servicemetric.models.ServiceMetricDataType; import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourReport; import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourScanReports; import com.telecominfraproject.wlan.status.StatusServiceInterface; @@ -157,13 +159,16 @@ public class MqttStatsPublisher implements StatsPublisherInterface { @Override @Async public void processMqttMessage(String topic, Report report) { + // Numerous try/catch blocks to address situations where logs are not being reported due to corrupt or invalid + // data in mqtt stats causing a crash + LOG.info("processMqttMessage for {} start", topic); long startTime = System.nanoTime(); String apId = extractApIdFromTopic(topic); - LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID()); + Equipment ce = equipmentServiceInterface.getByInventoryIdOrNull(apId); if (ce == null) { - LOG.warn("Cannot get equipment for inventoryId {}. Ignore mqtt message for topic {}", apId, topic); + LOG.error("Cannot get equipment for inventoryId {}. Ignore mqtt message for topic {}. Exiting processMqttMessage without processing report.", apId, topic); return; } @@ -179,36 +184,61 @@ public class MqttStatsPublisher implements StatsPublisherInterface { long clientMetricsStart = System.nanoTime(); populateApClientMetrics(metricRecordList, report, customerId, equipmentId, locationId); long clientMetricsStop = System.nanoTime(); - LOG.debug("Elapsed time for constructing Client metrics record was {} milliseconds for topic {}", - TimeUnit.MILLISECONDS.convert(clientMetricsStop - clientMetricsStart, TimeUnit.NANOSECONDS), topic); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed time for constructing Client metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(clientMetricsStop - clientMetricsStart, TimeUnit.NANOSECONDS), topic); + } catch (Exception e) { + LOG.error("Exception when trying to populateApClientMetrics.", e); + } + try { long nodeMetricsStart = System.nanoTime(); populateApNodeMetrics(metricRecordList, report, customerId, equipmentId, locationId); long nodeMetricsStop = System.nanoTime(); - LOG.debug("Elapsed time for constructing ApNode metrics record was {} milliseconds for topic {}", - TimeUnit.MILLISECONDS.convert(nodeMetricsStop - nodeMetricsStart, TimeUnit.NANOSECONDS), topic); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed time for constructing ApNode metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(nodeMetricsStop - nodeMetricsStart, TimeUnit.NANOSECONDS), topic); + } catch (Exception e) { + LOG.error("Exception when trying to populateApNodeMetrics.", e); + } + try { long neighbourScanStart = System.nanoTime(); populateNeighbourScanReports(metricRecordList, report, customerId, equipmentId, locationId); long neighbourScanStop = System.nanoTime(); - LOG.debug("Elapsed time for constructing Neighbour metrics record was {} milliseconds for topic {}", - TimeUnit.MILLISECONDS.convert(neighbourScanStop - neighbourScanStart, TimeUnit.NANOSECONDS), topic); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed time for constructing Neighbour metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(neighbourScanStop - neighbourScanStart, TimeUnit.NANOSECONDS), topic); + } catch (Exception e) { + LOG.error("Exception when trying to populateNeighbourScanReports.", e); + } + try { long channelInfoStart = System.nanoTime(); populateChannelInfoReports(metricRecordList, report, customerId, equipmentId, locationId, profileId); long channelInfoStop = System.nanoTime(); - LOG.debug("Elapsed time for constructing Channel metrics record was {} milliseconds for topic {}", - TimeUnit.MILLISECONDS.convert(channelInfoStop - channelInfoStart, TimeUnit.NANOSECONDS), topic); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed time for constructing Channel metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(channelInfoStop - channelInfoStart, TimeUnit.NANOSECONDS), topic); + } catch (Exception e) { + LOG.error("Exception when trying to populateChannelInfoReports.", e); + } + try { long ssidStart = System.nanoTime(); populateApSsidMetrics(metricRecordList, report, customerId, equipmentId, apId, locationId); long ssidStop = System.nanoTime(); - LOG.debug("Elapsed time for constructing ApSsid metrics record was {} milliseconds for topic {}", - TimeUnit.MILLISECONDS.convert(ssidStop - ssidStart, TimeUnit.NANOSECONDS), topic); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed time for constructing ApSsid metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(ssidStop - ssidStart, TimeUnit.NANOSECONDS), topic); + } catch (Exception e) { + LOG.error("Exception when trying to populateApSsidMetrics.", e); + } - if (!metricRecordList.isEmpty()) { - long serviceMetricTimestamp = System.currentTimeMillis(); - metricRecordList.stream().forEach(smr -> { + if (!metricRecordList.isEmpty()) { + long serviceMetricTimestamp = System.currentTimeMillis(); + metricRecordList.stream().forEach(smr -> { + try { // TODO use serviceMetricTimestamp rather than 0. This is done for now since there are some // channel metrics that have overlapping keys which messes up Cassandra if the same time stamp is // used @@ -229,21 +259,36 @@ public class MqttStatsPublisher implements StatsPublisherInterface { LOG.warn("AP {} stats report is {} seconds behind cloud. ServiceMetric {} sourceTimestampMs {} createdTimestampMs {}.", apId, diffSec, smr.getDataType(), sourceTimestamp, serviceMetricTimestamp); } + } catch (Exception e) { + LOG.error("Exception when trying to set ServiceMetric timestamps and base values where not present.", e); + } - }); + }); + + try { long publishStart = System.nanoTime(); cloudEventDispatcherInterface.publishMetrics(metricRecordList); long publishStop = System.nanoTime(); - LOG.debug("Elapsed publishing time for metrics records from AP {} is {} milliseconds", apId, - TimeUnit.MILLISECONDS.convert(publishStop - publishStart, TimeUnit.NANOSECONDS)); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed publishing time for metrics records from AP {} is {} milliseconds", apId, + TimeUnit.MILLISECONDS.convert(publishStop - publishStart, TimeUnit.NANOSECONDS)); + } catch (Exception e) { + LOG.error("Exception when trying to publishServiceMetrics.", e); } + } + try { long mqttEventsStart = System.nanoTime(); publishEvents(report, customerId, equipmentId, apId, locationId); long mqttEventsStop = System.nanoTime(); - LOG.debug("Elapsed publishing time for mqtt events from AP {} is {} milliseconds", apId, - TimeUnit.MILLISECONDS.convert(mqttEventsStop - mqttEventsStart, TimeUnit.NANOSECONDS)); + if (LOG.isDebugEnabled()) + LOG.debug("Elapsed publishing time for mqtt events from AP {} is {} milliseconds", apId, + TimeUnit.MILLISECONDS.convert(mqttEventsStop - mqttEventsStart, TimeUnit.NANOSECONDS)); + } catch (Exception e) { + LOG.error("Exception when trying to publishEvents.", e); + } + try { long endTime = System.nanoTime(); long elapsedTimeMillis = TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS); long elapsedTimeSeconds = TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS); @@ -253,15 +298,18 @@ public class MqttStatsPublisher implements StatsPublisherInterface { elapsedTimeSeconds, report); } else { if (elapsedTimeSeconds < 1) { - LOG.debug("Total elapsed processing time {} milliseconds for stats messages from AP {}", elapsedTimeMillis, apId); + if (LOG.isDebugEnabled()) + LOG.debug("Total elapsed processing time {} milliseconds for stats messages from AP {}", elapsedTimeMillis, apId); } else { - LOG.debug("Total elapsed processing time {} seconds for stats messages from AP {}", elapsedTimeSeconds, apId); + if (LOG.isDebugEnabled()) + LOG.debug("Total elapsed processing time {} seconds for stats messages from AP {}", elapsedTimeSeconds, apId); } } } catch (Exception e) { - LOG.error("Exception when processing stats messages from AP", e); + LOG.error("Exception when calculating elapsed time for metrics processing.", e); } + LOG.info("processMqttMessage for {} complete", topic); } @Override @@ -1561,121 +1609,120 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } void populateApSsidMetrics(List metricRecordList, Report report, int customerId, long equipmentId, String apId, long locationId) { + LOG.debug("populateApSsidMetrics start"); + + if (report.getClientsCount() == 0) { + LOG.info("populateApSsidMetrics no client data present, cannot build {}", ServiceMetricDataType.ApSsid); + return; + } + + LOG.debug("populateApSsidMetrics for Customer {} Equipment {} LocationId {} AP {}", customerId, equipmentId, locationId, apId); - LOG.info("populateApSsidMetrics for Customer {} Equipment {}", customerId, equipmentId); ServiceMetric smr = new ServiceMetric(customerId, equipmentId); smr.setLocationId(locationId); + smr.setDataType(ServiceMetricDataType.ApSsid); ApSsidMetrics apSsidMetrics = new ApSsidMetrics(); smr.setDetails(apSsidMetrics); metricRecordList.add(smr); for (ClientReport clientReport : report.getClientsList()) { - - LOG.debug("ClientReport for channel {} RadioBand {}", clientReport.getChannel(), clientReport.getBand()); - - // The service metric report's sourceTimestamp will be the most recent ClientReport timestamp - if (apSsidMetrics.getSourceTimestampMs() < clientReport.getTimestampMs()) - apSsidMetrics.setSourceTimestampMs(clientReport.getTimestampMs()); - - long txBytes = 0L; - long rxBytes = 0L; - long txFrames = 0L; - long rxFrames = 0L; - - int txErrors = 0; - int rxErrors = 0; - - int txRetries = 0; - int rxRetries = 0; - - int lastRssi = 0; - String ssid = null; - - Set clientMacs = new HashSet<>(); - RadioType radioType = OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(clientReport.getBand()); - SsidStatistics ssidStatistics = new SsidStatistics(); - // GET the Radio IF MAC (BSSID) from the activeBSSIDs - ssidStatistics.setSourceTimestampMs(clientReport.getTimestampMs()); - try { - Status activeBssidsStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.ACTIVE_BSSIDS); - LOG.debug("populateApSsidMetrics get BSSID from activeBssids {}", activeBssidsStatus); - if (activeBssidsStatus != null) { - if (activeBssidsStatus.getDetails() != null) { - ActiveBSSIDs activeBssids = (ActiveBSSIDs) activeBssidsStatus.getDetails(); - if (activeBssids.getActiveBSSIDs() != null) { - for (ActiveBSSID activeBssid : activeBssids.getActiveBSSIDs()) { - if (activeBssid.getRadioType() != null && activeBssid.getRadioType().equals(radioType)) { + LOG.debug("populateApSsidMetrics processing clientReport for RadioType {} Channel {}", radioType, clientReport.getChannel()); + Map> clientBySsid = clientReport.getClientListList().stream().filter(new Predicate() { + @Override + public boolean test(Client t) { + return t.hasSsid() && t.hasStats(); + } + }).collect(Collectors.groupingBy(c -> c.getSsid())); + + if (LOG.isTraceEnabled()) + LOG.trace("populateApSsidMetrics clientBySsid {}", clientBySsid); + + final List ssidStats = new ArrayList<>(); + + clientBySsid.entrySet().stream().forEach(e -> { + + if (LOG.isTraceEnabled()) + LOG.trace("populateApSsidMetrics processing clients {}", e.getValue()); + + SsidStatistics stats = new SsidStatistics(); + stats.setSsid(e.getKey()); + stats.setNumClient(e.getValue().size()); + stats.setSourceTimestampMs(clientReport.getTimestampMs()); + + // Get the BSSID (MAC address) for this SSID + String bssid = getBssidForClientSsid(customerId, equipmentId, apId, e.getKey(), radioType); + if (bssid != null) + stats.setBssid(MacAddress.valueOf(bssid)); + else + LOG.warn("Could not get BSSID for customer {} equipment {} apId {}, ssid {} radioType {}", customerId, equipmentId, apId, e.getKey(), + radioType); + + long txBytes = e.getValue().stream().mapToLong(c -> c.getStats().getTxBytes()).sum(); + long rxBytes = e.getValue().stream().mapToLong(c -> c.getStats().getRxBytes()).sum(); + long txFrame = e.getValue().stream().mapToLong(c -> c.getStats().getTxFrames()).sum(); + long rxFrame = e.getValue().stream().mapToLong(c -> c.getStats().getRxFrames()).sum(); + + long txErrors = e.getValue().stream().mapToLong(c -> c.getStats().getTxErrors()).sum(); + long rxErrors = e.getValue().stream().mapToLong(c -> c.getStats().getRxErrors()).sum(); + + long txRetries = e.getValue().stream().mapToLong(c -> c.getStats().getTxRetries()).sum(); + long rxRetries = e.getValue().stream().mapToLong(c -> c.getStats().getRxRetries()).sum(); + + int[] rssi = e.getValue().stream().mapToInt(c -> c.getStats().getRssi()).toArray(); + double avgRssi = DecibelUtils.getAverageDecibel(rssi); + + stats.setRxLastRssi(Double.valueOf(avgRssi).intValue()); + stats.setNumRxData(Long.valueOf(rxFrame).intValue()); + stats.setRxBytes(rxBytes - rxErrors - rxRetries); + stats.setNumTxDataRetries(Long.valueOf(txRetries).intValue()); + stats.setNumRcvFrameForTx(txFrame); + stats.setNumTxBytesSucc(txBytes - txErrors - txRetries); + stats.setNumRxRetry(Long.valueOf(rxRetries).intValue()); + + if (LOG.isTraceEnabled()) + LOG.trace("populateApSsidMetrics stats {}", stats.toPrettyString()); + ssidStats.add(stats); + + }); + + if (LOG.isTraceEnabled()) + LOG.trace("populateApSsidMetrics ssidStats {}", ssidStats); + apSsidMetrics.getSsidStats().put(radioType, ssidStats); + + } + + if (LOG.isTraceEnabled()) + LOG.trace("populateApSsidMetrics apSsidMetrics {}", apSsidMetrics); + + LOG.debug("populateApSsidMetrics finished"); + } + + String getBssidForClientSsid(int customerId, long equipmentId, String apId, String ssid, RadioType radioType) { + try { + Status activeBssidsStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.ACTIVE_BSSIDS); + LOG.debug("populateApSsidMetrics get BSSID from activeBssids {}", activeBssidsStatus); + if (activeBssidsStatus != null) { + if (activeBssidsStatus.getDetails() != null) { + ActiveBSSIDs activeBssids = (ActiveBSSIDs) activeBssidsStatus.getDetails(); + if (activeBssids.getActiveBSSIDs() != null) { + for (ActiveBSSID activeBssid : activeBssids.getActiveBSSIDs()) { + if (activeBssid.getRadioType() != null && activeBssid.getRadioType().equals(radioType)) { + if (activeBssid.getSsid() != null && activeBssid.getSsid().equals(ssid)) { if (activeBssid.getBssid() != null) { - ssidStatistics.setBssid(MacAddress.valueOf(activeBssid.getBssid())); + return activeBssid.getBssid(); } } } } } } - } catch (Exception e) { - LOG.error("Could not get active BSSIDs for apId {} radioType {}", apId, radioType, e); } - - if (LOG.isTraceEnabled()) - LOG.trace("Client Report Date is {}", new Date(clientReport.getTimestampMs())); - int numConnectedClients = 0; - for (Client client : clientReport.getClientListList()) { - if (client.hasStats()) { - - if (client.hasSsid()) { - ssid = client.getSsid(); - } - - if (client.hasMacAddress()) { - clientMacs.add(client.getMacAddress()); - - } else { - continue; // cannot have a session without a MAC address - } - - rxBytes += client.getStats().getRxBytes(); - txBytes += client.getStats().getTxBytes(); - rxFrames += client.getStats().getRxFrames(); - txFrames += client.getStats().getTxFrames(); - rxRetries += client.getStats().getRxRetries(); - txRetries += client.getStats().getTxRetries(); - rxErrors += client.getStats().getRxErrors(); - txErrors += client.getStats().getTxErrors(); - lastRssi = client.getStats().getRssi(); - - if (client.hasConnected() && client.getConnected() && client.hasMacAddress()) { - numConnectedClients += 1; - } - } - - } - - ssidStatistics.setRxLastRssi(lastRssi); - ssidStatistics.setNumRxData(Long.valueOf(rxFrames).intValue()); - ssidStatistics.setRxBytes(rxBytes - rxErrors - rxRetries); - ssidStatistics.setNumTxDataRetries(txRetries); - ssidStatistics.setNumRcvFrameForTx(txFrames); - ssidStatistics.setNumTxBytesSucc(txBytes - txErrors - txRetries); - ssidStatistics.setNumRxRetry(rxRetries); - ssidStatistics.setNumClient(numConnectedClients); - ssidStatistics.setSsid(ssid); - - if (radioType != null) { - List ssidStatsList = apSsidMetrics.getSsidStats().get(radioType); - if (ssidStatsList == null) { - ssidStatsList = new ArrayList<>(); - } - ssidStatsList.add(ssidStatistics); - apSsidMetrics.getSsidStats().put(radioType, ssidStatsList); - } - + } catch (Exception e) { + LOG.error("Could not get active BSSIDs for apId {} radioType {}", apId, radioType, e); } - - LOG.debug("ApSsidMetrics {}", apSsidMetrics); - + return null; } ChannelInfo createChannelInfo(long equipmentId, RadioType radioType, List surveySampleList, ChannelBandwidth channelBandwidth) {