[WIFI-3287] Some metrics show choppy behavior while the others are OK

Signed-off-by: Mike Hansen <mike.hansen@netexperience.com>
This commit is contained in:
Mike Hansen
2021-07-29 11:27:24 -04:00
parent 8105c5bc9d
commit a4559b4f37

View File

@@ -7,13 +7,14 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ import com.telecominfraproject.wlan.servicemetric.channelinfo.models.ChannelInfo
import com.telecominfraproject.wlan.servicemetric.channelinfo.models.ChannelInfoReports;
import com.telecominfraproject.wlan.servicemetric.client.models.ClientMetrics;
import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric;
import com.telecominfraproject.wlan.servicemetric.models.ServiceMetricDataType;
import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourReport;
import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourScanReports;
import com.telecominfraproject.wlan.status.StatusServiceInterface;
@@ -157,13 +159,16 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
@Override
@Async
public void processMqttMessage(String topic, Report report) {
// Numerous try/catch blocks to address situations where logs are not being reported due to corrupt or invalid
// data in mqtt stats causing a crash
LOG.info("processMqttMessage for {} start", topic);
long startTime = System.nanoTime();
String apId = extractApIdFromTopic(topic);
LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID());
Equipment ce = equipmentServiceInterface.getByInventoryIdOrNull(apId);
if (ce == null) {
LOG.warn("Cannot get equipment for inventoryId {}. Ignore mqtt message for topic {}", apId, topic);
LOG.error("Cannot get equipment for inventoryId {}. Ignore mqtt message for topic {}. Exiting processMqttMessage without processing report.", apId, topic);
return;
}
@@ -179,36 +184,61 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
long clientMetricsStart = System.nanoTime();
populateApClientMetrics(metricRecordList, report, customerId, equipmentId, locationId);
long clientMetricsStop = System.nanoTime();
LOG.debug("Elapsed time for constructing Client metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(clientMetricsStop - clientMetricsStart, TimeUnit.NANOSECONDS), topic);
if (LOG.isDebugEnabled())
LOG.debug("Elapsed time for constructing Client metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(clientMetricsStop - clientMetricsStart, TimeUnit.NANOSECONDS), topic);
} catch (Exception e) {
LOG.error("Exception when trying to populateApClientMetrics.", e);
}
try {
long nodeMetricsStart = System.nanoTime();
populateApNodeMetrics(metricRecordList, report, customerId, equipmentId, locationId);
long nodeMetricsStop = System.nanoTime();
LOG.debug("Elapsed time for constructing ApNode metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(nodeMetricsStop - nodeMetricsStart, TimeUnit.NANOSECONDS), topic);
if (LOG.isDebugEnabled())
LOG.debug("Elapsed time for constructing ApNode metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(nodeMetricsStop - nodeMetricsStart, TimeUnit.NANOSECONDS), topic);
} catch (Exception e) {
LOG.error("Exception when trying to populateApNodeMetrics.", e);
}
try {
long neighbourScanStart = System.nanoTime();
populateNeighbourScanReports(metricRecordList, report, customerId, equipmentId, locationId);
long neighbourScanStop = System.nanoTime();
LOG.debug("Elapsed time for constructing Neighbour metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(neighbourScanStop - neighbourScanStart, TimeUnit.NANOSECONDS), topic);
if (LOG.isDebugEnabled())
LOG.debug("Elapsed time for constructing Neighbour metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(neighbourScanStop - neighbourScanStart, TimeUnit.NANOSECONDS), topic);
} catch (Exception e) {
LOG.error("Exception when trying to populateNeighbourScanReports.", e);
}
try {
long channelInfoStart = System.nanoTime();
populateChannelInfoReports(metricRecordList, report, customerId, equipmentId, locationId, profileId);
long channelInfoStop = System.nanoTime();
LOG.debug("Elapsed time for constructing Channel metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(channelInfoStop - channelInfoStart, TimeUnit.NANOSECONDS), topic);
if (LOG.isDebugEnabled())
LOG.debug("Elapsed time for constructing Channel metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(channelInfoStop - channelInfoStart, TimeUnit.NANOSECONDS), topic);
} catch (Exception e) {
LOG.error("Exception when trying to populateChannelInfoReports.", e);
}
try {
long ssidStart = System.nanoTime();
populateApSsidMetrics(metricRecordList, report, customerId, equipmentId, apId, locationId);
long ssidStop = System.nanoTime();
LOG.debug("Elapsed time for constructing ApSsid metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(ssidStop - ssidStart, TimeUnit.NANOSECONDS), topic);
if (LOG.isDebugEnabled())
LOG.debug("Elapsed time for constructing ApSsid metrics record was {} milliseconds for topic {}",
TimeUnit.MILLISECONDS.convert(ssidStop - ssidStart, TimeUnit.NANOSECONDS), topic);
} catch (Exception e) {
LOG.error("Exception when trying to populateApSsidMetrics.", e);
}
if (!metricRecordList.isEmpty()) {
long serviceMetricTimestamp = System.currentTimeMillis();
metricRecordList.stream().forEach(smr -> {
if (!metricRecordList.isEmpty()) {
long serviceMetricTimestamp = System.currentTimeMillis();
metricRecordList.stream().forEach(smr -> {
try {
// TODO use serviceMetricTimestamp rather than 0. This is done for now since there are some
// channel metrics that have overlapping keys which messes up Cassandra if the same time stamp is
// used
@@ -229,21 +259,36 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
LOG.warn("AP {} stats report is {} seconds behind cloud. ServiceMetric {} sourceTimestampMs {} createdTimestampMs {}.", apId, diffSec,
smr.getDataType(), sourceTimestamp, serviceMetricTimestamp);
}
} catch (Exception e) {
LOG.error("Exception when trying to set ServiceMetric timestamps and base values where not present.", e);
}
});
});
try {
long publishStart = System.nanoTime();
cloudEventDispatcherInterface.publishMetrics(metricRecordList);
long publishStop = System.nanoTime();
LOG.debug("Elapsed publishing time for metrics records from AP {} is {} milliseconds", apId,
TimeUnit.MILLISECONDS.convert(publishStop - publishStart, TimeUnit.NANOSECONDS));
if (LOG.isDebugEnabled())
LOG.debug("Elapsed publishing time for metrics records from AP {} is {} milliseconds", apId,
TimeUnit.MILLISECONDS.convert(publishStop - publishStart, TimeUnit.NANOSECONDS));
} catch (Exception e) {
LOG.error("Exception when trying to publishServiceMetrics.", e);
}
}
try {
long mqttEventsStart = System.nanoTime();
publishEvents(report, customerId, equipmentId, apId, locationId);
long mqttEventsStop = System.nanoTime();
LOG.debug("Elapsed publishing time for mqtt events from AP {} is {} milliseconds", apId,
TimeUnit.MILLISECONDS.convert(mqttEventsStop - mqttEventsStart, TimeUnit.NANOSECONDS));
if (LOG.isDebugEnabled())
LOG.debug("Elapsed publishing time for mqtt events from AP {} is {} milliseconds", apId,
TimeUnit.MILLISECONDS.convert(mqttEventsStop - mqttEventsStart, TimeUnit.NANOSECONDS));
} catch (Exception e) {
LOG.error("Exception when trying to publishEvents.", e);
}
try {
long endTime = System.nanoTime();
long elapsedTimeMillis = TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS);
long elapsedTimeSeconds = TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS);
@@ -253,15 +298,18 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
elapsedTimeSeconds, report);
} else {
if (elapsedTimeSeconds < 1) {
LOG.debug("Total elapsed processing time {} milliseconds for stats messages from AP {}", elapsedTimeMillis, apId);
if (LOG.isDebugEnabled())
LOG.debug("Total elapsed processing time {} milliseconds for stats messages from AP {}", elapsedTimeMillis, apId);
} else {
LOG.debug("Total elapsed processing time {} seconds for stats messages from AP {}", elapsedTimeSeconds, apId);
if (LOG.isDebugEnabled())
LOG.debug("Total elapsed processing time {} seconds for stats messages from AP {}", elapsedTimeSeconds, apId);
}
}
} catch (Exception e) {
LOG.error("Exception when processing stats messages from AP", e);
LOG.error("Exception when calculating elapsed time for metrics processing.", e);
}
LOG.info("processMqttMessage for {} complete", topic);
}
@Override
@@ -1561,121 +1609,120 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
}
void populateApSsidMetrics(List<ServiceMetric> metricRecordList, Report report, int customerId, long equipmentId, String apId, long locationId) {
LOG.debug("populateApSsidMetrics start");
if (report.getClientsCount() == 0) {
LOG.info("populateApSsidMetrics no client data present, cannot build {}", ServiceMetricDataType.ApSsid);
return;
}
LOG.debug("populateApSsidMetrics for Customer {} Equipment {} LocationId {} AP {}", customerId, equipmentId, locationId, apId);
LOG.info("populateApSsidMetrics for Customer {} Equipment {}", customerId, equipmentId);
ServiceMetric smr = new ServiceMetric(customerId, equipmentId);
smr.setLocationId(locationId);
smr.setDataType(ServiceMetricDataType.ApSsid);
ApSsidMetrics apSsidMetrics = new ApSsidMetrics();
smr.setDetails(apSsidMetrics);
metricRecordList.add(smr);
for (ClientReport clientReport : report.getClientsList()) {
LOG.debug("ClientReport for channel {} RadioBand {}", clientReport.getChannel(), clientReport.getBand());
// The service metric report's sourceTimestamp will be the most recent ClientReport timestamp
if (apSsidMetrics.getSourceTimestampMs() < clientReport.getTimestampMs())
apSsidMetrics.setSourceTimestampMs(clientReport.getTimestampMs());
long txBytes = 0L;
long rxBytes = 0L;
long txFrames = 0L;
long rxFrames = 0L;
int txErrors = 0;
int rxErrors = 0;
int txRetries = 0;
int rxRetries = 0;
int lastRssi = 0;
String ssid = null;
Set<String> clientMacs = new HashSet<>();
RadioType radioType = OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(clientReport.getBand());
SsidStatistics ssidStatistics = new SsidStatistics();
// GET the Radio IF MAC (BSSID) from the activeBSSIDs
ssidStatistics.setSourceTimestampMs(clientReport.getTimestampMs());
try {
Status activeBssidsStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.ACTIVE_BSSIDS);
LOG.debug("populateApSsidMetrics get BSSID from activeBssids {}", activeBssidsStatus);
if (activeBssidsStatus != null) {
if (activeBssidsStatus.getDetails() != null) {
ActiveBSSIDs activeBssids = (ActiveBSSIDs) activeBssidsStatus.getDetails();
if (activeBssids.getActiveBSSIDs() != null) {
for (ActiveBSSID activeBssid : activeBssids.getActiveBSSIDs()) {
if (activeBssid.getRadioType() != null && activeBssid.getRadioType().equals(radioType)) {
LOG.debug("populateApSsidMetrics processing clientReport for RadioType {} Channel {}", radioType, clientReport.getChannel());
Map<String, List<Client>> clientBySsid = clientReport.getClientListList().stream().filter(new Predicate<Client>() {
@Override
public boolean test(Client t) {
return t.hasSsid() && t.hasStats();
}
}).collect(Collectors.groupingBy(c -> c.getSsid()));
if (LOG.isTraceEnabled())
LOG.trace("populateApSsidMetrics clientBySsid {}", clientBySsid);
final List<SsidStatistics> ssidStats = new ArrayList<>();
clientBySsid.entrySet().stream().forEach(e -> {
if (LOG.isTraceEnabled())
LOG.trace("populateApSsidMetrics processing clients {}", e.getValue());
SsidStatistics stats = new SsidStatistics();
stats.setSsid(e.getKey());
stats.setNumClient(e.getValue().size());
stats.setSourceTimestampMs(clientReport.getTimestampMs());
// Get the BSSID (MAC address) for this SSID
String bssid = getBssidForClientSsid(customerId, equipmentId, apId, e.getKey(), radioType);
if (bssid != null)
stats.setBssid(MacAddress.valueOf(bssid));
else
LOG.warn("Could not get BSSID for customer {} equipment {} apId {}, ssid {} radioType {}", customerId, equipmentId, apId, e.getKey(),
radioType);
long txBytes = e.getValue().stream().mapToLong(c -> c.getStats().getTxBytes()).sum();
long rxBytes = e.getValue().stream().mapToLong(c -> c.getStats().getRxBytes()).sum();
long txFrame = e.getValue().stream().mapToLong(c -> c.getStats().getTxFrames()).sum();
long rxFrame = e.getValue().stream().mapToLong(c -> c.getStats().getRxFrames()).sum();
long txErrors = e.getValue().stream().mapToLong(c -> c.getStats().getTxErrors()).sum();
long rxErrors = e.getValue().stream().mapToLong(c -> c.getStats().getRxErrors()).sum();
long txRetries = e.getValue().stream().mapToLong(c -> c.getStats().getTxRetries()).sum();
long rxRetries = e.getValue().stream().mapToLong(c -> c.getStats().getRxRetries()).sum();
int[] rssi = e.getValue().stream().mapToInt(c -> c.getStats().getRssi()).toArray();
double avgRssi = DecibelUtils.getAverageDecibel(rssi);
stats.setRxLastRssi(Double.valueOf(avgRssi).intValue());
stats.setNumRxData(Long.valueOf(rxFrame).intValue());
stats.setRxBytes(rxBytes - rxErrors - rxRetries);
stats.setNumTxDataRetries(Long.valueOf(txRetries).intValue());
stats.setNumRcvFrameForTx(txFrame);
stats.setNumTxBytesSucc(txBytes - txErrors - txRetries);
stats.setNumRxRetry(Long.valueOf(rxRetries).intValue());
if (LOG.isTraceEnabled())
LOG.trace("populateApSsidMetrics stats {}", stats.toPrettyString());
ssidStats.add(stats);
});
if (LOG.isTraceEnabled())
LOG.trace("populateApSsidMetrics ssidStats {}", ssidStats);
apSsidMetrics.getSsidStats().put(radioType, ssidStats);
}
if (LOG.isTraceEnabled())
LOG.trace("populateApSsidMetrics apSsidMetrics {}", apSsidMetrics);
LOG.debug("populateApSsidMetrics finished");
}
String getBssidForClientSsid(int customerId, long equipmentId, String apId, String ssid, RadioType radioType) {
try {
Status activeBssidsStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.ACTIVE_BSSIDS);
LOG.debug("populateApSsidMetrics get BSSID from activeBssids {}", activeBssidsStatus);
if (activeBssidsStatus != null) {
if (activeBssidsStatus.getDetails() != null) {
ActiveBSSIDs activeBssids = (ActiveBSSIDs) activeBssidsStatus.getDetails();
if (activeBssids.getActiveBSSIDs() != null) {
for (ActiveBSSID activeBssid : activeBssids.getActiveBSSIDs()) {
if (activeBssid.getRadioType() != null && activeBssid.getRadioType().equals(radioType)) {
if (activeBssid.getSsid() != null && activeBssid.getSsid().equals(ssid)) {
if (activeBssid.getBssid() != null) {
ssidStatistics.setBssid(MacAddress.valueOf(activeBssid.getBssid()));
return activeBssid.getBssid();
}
}
}
}
}
}
} catch (Exception e) {
LOG.error("Could not get active BSSIDs for apId {} radioType {}", apId, radioType, e);
}
if (LOG.isTraceEnabled())
LOG.trace("Client Report Date is {}", new Date(clientReport.getTimestampMs()));
int numConnectedClients = 0;
for (Client client : clientReport.getClientListList()) {
if (client.hasStats()) {
if (client.hasSsid()) {
ssid = client.getSsid();
}
if (client.hasMacAddress()) {
clientMacs.add(client.getMacAddress());
} else {
continue; // cannot have a session without a MAC address
}
rxBytes += client.getStats().getRxBytes();
txBytes += client.getStats().getTxBytes();
rxFrames += client.getStats().getRxFrames();
txFrames += client.getStats().getTxFrames();
rxRetries += client.getStats().getRxRetries();
txRetries += client.getStats().getTxRetries();
rxErrors += client.getStats().getRxErrors();
txErrors += client.getStats().getTxErrors();
lastRssi = client.getStats().getRssi();
if (client.hasConnected() && client.getConnected() && client.hasMacAddress()) {
numConnectedClients += 1;
}
}
}
ssidStatistics.setRxLastRssi(lastRssi);
ssidStatistics.setNumRxData(Long.valueOf(rxFrames).intValue());
ssidStatistics.setRxBytes(rxBytes - rxErrors - rxRetries);
ssidStatistics.setNumTxDataRetries(txRetries);
ssidStatistics.setNumRcvFrameForTx(txFrames);
ssidStatistics.setNumTxBytesSucc(txBytes - txErrors - txRetries);
ssidStatistics.setNumRxRetry(rxRetries);
ssidStatistics.setNumClient(numConnectedClients);
ssidStatistics.setSsid(ssid);
if (radioType != null) {
List<SsidStatistics> ssidStatsList = apSsidMetrics.getSsidStats().get(radioType);
if (ssidStatsList == null) {
ssidStatsList = new ArrayList<>();
}
ssidStatsList.add(ssidStatistics);
apSsidMetrics.getSsidStats().put(radioType, ssidStatsList);
}
} catch (Exception e) {
LOG.error("Could not get active BSSIDs for apId {} radioType {}", apId, radioType, e);
}
LOG.debug("ApSsidMetrics {}", apSsidMetrics);
return null;
}
ChannelInfo createChannelInfo(long equipmentId, RadioType radioType, List<SurveySample> surveySampleList, ChannelBandwidth channelBandwidth) {