From 42a520b7ec75d65c98fe15fbb3fe797a911fdb27 Mon Sep 17 00:00:00 2001 From: Mike Hansen Date: Thu, 8 Jul 2021 19:10:26 -0400 Subject: [PATCH] MQTT message delivery reliability - MqttStatsPublisher measure time for overall and per report metrics generation Signed-off-by: Mike Hansen --- .../integration/utils/MqttStatsPublisher.java | 259 ++++++------------ .../utils/RealtimeEventPublisher.java | 25 +- .../opensync/mqtt/OpensyncMqttClient.java | 76 ++--- 3 files changed, 122 insertions(+), 238 deletions(-) diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index 68340f7..aa65bb9 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -13,18 +13,17 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors; import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.TypeRegistry; import com.telecominfraproject.wlan.alarm.AlarmServiceInterface; import com.telecominfraproject.wlan.alarm.models.Alarm; import com.telecominfraproject.wlan.alarm.models.AlarmCode; @@ -73,10 +72,10 @@ import com.telecominfraproject.wlan.servicemetric.channelinfo.models.ChannelInfo import com.telecominfraproject.wlan.servicemetric.channelinfo.models.ChannelInfoReports; import com.telecominfraproject.wlan.servicemetric.client.models.ClientMetrics; import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric; +import com.telecominfraproject.wlan.servicemetric.models.ServiceMetricDataType; import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourReport; import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourScanReports; import com.telecominfraproject.wlan.status.StatusServiceInterface; -import com.telecominfraproject.wlan.status.equipment.models.EquipmentAdminStatusData; import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSID; import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSIDs; import com.telecominfraproject.wlan.status.equipment.report.models.EquipmentCapacityDetails; @@ -117,10 +116,6 @@ import sts.OpensyncStats.Survey; import sts.OpensyncStats.Survey.SurveySample; import sts.OpensyncStats.SurveyType; import sts.OpensyncStats.VLANMetrics; -import traffic.NetworkMetadata; -import traffic.NetworkMetadata.FlowReport; -import wc.stats.IpDnsTelemetry; -import wc.stats.IpDnsTelemetry.WCStatsReport; @org.springframework.context.annotation.Profile("opensync_cloud_config") @Component @@ -154,13 +149,18 @@ public class MqttStatsPublisher implements StatsPublisherInterface { @Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}") private int memoryUtilThresholdPct; + @Value("${tip.wlan.mqttStatsPublisher.reportProcessingThresholdSec:30}") + public int reportProcessingThresholdSec; + @Override + @Async public void processMqttMessage(String topic, Report report) { - LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID()); + long startTime = System.nanoTime(); String apId = extractApIdFromTopic(topic); + LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID()); Equipment ce = equipmentServiceInterface.getByInventoryIdOrNull(apId); if (ce == null) { - LOG.warn("Cannot read equipment {}", apId); + LOG.warn("Cannot get equipment for inventoryId {}. Ignore mqtt message for topic {}", apId, topic); return; } @@ -172,14 +172,37 @@ public class MqttStatsPublisher implements StatsPublisherInterface { List metricRecordList = new ArrayList<>(); try { + long clientMetricsStart = System.nanoTime(); populateApClientMetrics(metricRecordList, report, customerId, equipmentId, locationId); + long clientMetricsStop = System.nanoTime(); + LOG.debug("Elapsed time for constructing Client metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(clientMetricsStop - clientMetricsStart, TimeUnit.NANOSECONDS), topic); + + long nodeMetricsStart = System.nanoTime(); populateApNodeMetrics(metricRecordList, report, customerId, equipmentId, locationId); + long nodeMetricsStop = System.nanoTime(); + LOG.debug("Elapsed time for constructing ApNode metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(nodeMetricsStop - nodeMetricsStart, TimeUnit.NANOSECONDS), topic); + + long neighbourScanStart = System.nanoTime(); populateNeighbourScanReports(metricRecordList, report, customerId, equipmentId, locationId); + long neighbourScanStop = System.nanoTime(); + LOG.debug("Elapsed time for constructing Neighbour metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(neighbourScanStop - neighbourScanStart, TimeUnit.NANOSECONDS), topic); + + long channelInfoStart = System.nanoTime(); populateChannelInfoReports(metricRecordList, report, customerId, equipmentId, locationId, profileId); + long channelInfoStop = System.nanoTime(); + LOG.debug("Elapsed time for constructing Channel metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(channelInfoStop - channelInfoStart, TimeUnit.NANOSECONDS), topic); + + long ssidStart = System.nanoTime(); populateApSsidMetrics(metricRecordList, report, customerId, equipmentId, apId, locationId); + long ssidStop = System.nanoTime(); + LOG.debug("Elapsed time for constructing ApSsid metrics record was {} milliseconds for topic {}", + TimeUnit.MILLISECONDS.convert(ssidStop - ssidStart, TimeUnit.NANOSECONDS), topic); if (!metricRecordList.isEmpty()) { - long serviceMetricTimestamp = System.currentTimeMillis(); LOG.debug("Current timestamp for service metrics is {}", serviceMetricTimestamp); metricRecordList.stream().forEach(smr -> { @@ -191,12 +214,33 @@ public class MqttStatsPublisher implements StatsPublisherInterface { if (smr.getEquipmentId() == 0L) smr.setEquipmentId(equipmentId); }); - metricRecordList.stream().forEach(smr -> { - LOG.debug("ServiceMetric {}", smr); - }); + long publishStart = System.nanoTime(); cloudEventDispatcherInterface.publishMetrics(metricRecordList); + long publishStop = System.nanoTime(); + LOG.debug("Elapsed publishing time for metrics records from AP {} is {} milliseconds", apId, + TimeUnit.MILLISECONDS.convert(publishStop - publishStart, TimeUnit.NANOSECONDS)); } + + long mqttEventsStart = System.nanoTime(); publishEvents(report, customerId, equipmentId, apId, locationId); + long mqttEventsStop = System.nanoTime(); + LOG.debug("Elapsed publishing time for mqtt events from AP {} is {} milliseconds", apId, + TimeUnit.MILLISECONDS.convert(mqttEventsStop - mqttEventsStart, TimeUnit.NANOSECONDS)); + + long endTime = System.nanoTime(); + long elapsedTimeMillis = TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS); + long elapsedTimeSeconds = TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS); + + if (elapsedTimeSeconds > reportProcessingThresholdSec) { + LOG.warn("Processing threshold exceeded for stats messages from AP {}. Elapsed processing time {} seconds. Report: {}", apId, + elapsedTimeSeconds, report); + } else { + if (elapsedTimeSeconds < 1) { + LOG.debug("Total elapsed processing time {} milliseconds for stats messages from AP {}", elapsedTimeMillis, apId); + } else { + LOG.debug("Total elapsed processing time {} seconds for stats messages from AP {}", elapsedTimeSeconds, apId); + } + } } catch (Exception e) { LOG.error("Exception when processing stats messages from AP", e); } @@ -211,69 +255,46 @@ public class MqttStatsPublisher implements StatsPublisherInterface { void publishEvents(Report report, int customerId, long equipmentId, String apId, long locationId) { + // asynchronous realtimeEventPublisher.publishSipCallEvents(customerId, equipmentId, locationId, report.getVideoVoiceReportList()); for (EventReport eventReport : report.getEventReportList()) { for (sts.OpensyncStats.EventReport.ClientSession apEventClientSession : eventReport.getClientSessionList()) { - LOG.debug("Processing EventReport::ClientSession {}", apEventClientSession); - + LOG.debug("Processing EventReport::ClientSession for AP {}", apId); + // for the following MQTT events, the client/client session is first updated, then the real time event is published, asynchronously. if (apEventClientSession.hasClientAuthEvent()) { processClientAuthEvent(customerId, equipmentId, locationId, apEventClientSession); - realtimeEventPublisher.publishClientAuthSystemEvent(customerId, equipmentId, locationId, apEventClientSession.getClientAuthEvent()); } - if (apEventClientSession.hasClientAssocEvent()) { processClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession); - realtimeEventPublisher.publishClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession.getClientAssocEvent()); } - if (apEventClientSession.hasClientFirstDataEvent()) { processClientFirstDataEvent(customerId, equipmentId, locationId, apEventClientSession); - realtimeEventPublisher.publishClientFirstDataEvent(customerId, equipmentId, locationId, apEventClientSession.getClientFirstDataEvent()); - } - if (apEventClientSession.hasClientIdEvent()) { processClientIdEvent(customerId, equipmentId, locationId, apEventClientSession); - realtimeEventPublisher.publishClientIdEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIdEvent()); - } - if (apEventClientSession.hasClientIpEvent()) { processClientIpEvent(customerId, equipmentId, locationId, apEventClientSession); - realtimeEventPublisher.publishClientIpEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIpEvent()); - } - if (apEventClientSession.hasClientConnectEvent()) { processClientConnectEvent(customerId, equipmentId, locationId, eventReport, apEventClientSession); - realtimeEventPublisher.publishClientConnectSuccessEvent(customerId, equipmentId, locationId, apEventClientSession.getClientConnectEvent()); - } - if (apEventClientSession.hasClientDisconnectEvent()) { processClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession); - realtimeEventPublisher.publishClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession.getClientDisconnectEvent()); } - if (apEventClientSession.hasClientTimeoutEvent()) { processClientTimeoutEvent(customerId, equipmentId, locationId, apEventClientSession); - realtimeEventPublisher.publishClientTimeoutEvent(customerId, equipmentId, locationId, apEventClientSession.getClientTimeoutEvent()); - } - if (apEventClientSession.hasClientFailureEvent()) { processClientFailureEvent(customerId, equipmentId, locationId, apEventClientSession); - realtimeEventPublisher.publishClientFailureEvent(customerId, equipmentId, locationId, apEventClientSession.getClientFailureEvent()); - } - } - + // asynchronous realtimeEventPublisher.publishChannelHopEvents(customerId, equipmentId, locationId, eventReport); - + // asynchronous realtimeEventPublisher.publishDhcpTransactionEvents(customerId, equipmentId, locationId, eventReport.getDhcpTransactionList()); } @@ -381,12 +402,9 @@ public class MqttStatsPublisher implements StatsPublisherInterface { if (ipAddress != null) { try { InetAddress inetAddress = InetAddress.getByAddress(ipAddress.toByteArray()); - LOG.info("ipAddr {} ", inetAddress); if (inetAddress instanceof Inet4Address) { - LOG.info("IPv4 address {}", inetAddress); clientSession.getDetails().setIpAddress(inetAddress); } else if (inetAddress instanceof Inet6Address) { - LOG.info("IPv6 address {}", inetAddress); clientSession.getDetails().setIpAddress(inetAddress); } else { LOG.error("Invalid IP Address {}", ipAddress); @@ -404,6 +422,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientConnectSuccessEvent(customerId, equipmentId, locationId, apEventClientSession.getClientConnectEvent()); } @@ -480,6 +499,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { clientSession.getDetails().setAssocTimestamp(apClientEvent.getTimestampMs()); clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession.getClientDisconnectEvent()); } @@ -527,6 +547,9 @@ public class MqttStatsPublisher implements StatsPublisherInterface { clientSession.getDetails().setAssociationState(AssociationState._802_11_Authenticated); clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); + + realtimeEventPublisher.publishClientAuthSystemEvent(customerId, equipmentId, locationId, apEventClientSession.getClientAuthEvent()); + } protected void processClientAssocEvent(int customerId, long equipmentId, long locationId, @@ -594,6 +617,8 @@ public class MqttStatsPublisher implements StatsPublisherInterface { clientSession.getDetails().setAssociationState(AssociationState._802_11_Associated); clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession.getClientAssocEvent()); + } protected void processClientFailureEvent(int customerId, long equipmentId, long locationId, @@ -641,7 +666,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { clientSession.getDetails().setLastFailureDetails(clientFailureDetails); clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); - + realtimeEventPublisher.publishClientFailureEvent(customerId, equipmentId, locationId, apEventClientSession.getClientFailureEvent()); } protected void processClientFirstDataEvent(int customerId, long equipmentId, long locationId, @@ -688,6 +713,8 @@ public class MqttStatsPublisher implements StatsPublisherInterface { clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientFirstDataEvent(customerId, equipmentId, locationId, apEventClientSession.getClientFirstDataEvent()); + } protected void processClientIdEvent(int customerId, long equipmentId, long locationId, sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { @@ -732,6 +759,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientIdEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIdEvent()); } protected void processClientIpEvent(int customerId, long equipmentId, long locationId, sts.OpensyncStats.EventReport.ClientSession apEventClientSession) { @@ -774,10 +802,8 @@ public class MqttStatsPublisher implements StatsPublisherInterface { try { InetAddress inetAddress = InetAddress.getByAddress(ipAddress.toByteArray()); if (inetAddress instanceof Inet4Address) { - LOG.info("IPv4 address {}", inetAddress); clientSession.getDetails().setIpAddress(inetAddress); } else if (inetAddress instanceof Inet6Address) { - LOG.info("IPv6 address {}", inetAddress); clientSession.getDetails().setIpAddress(inetAddress); } else { LOG.error("Invalid IP Address {}", ipAddress); @@ -790,6 +816,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientIpEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIpEvent()); } protected void processClientTimeoutEvent(int customerId, long equipmentId, long locationId, @@ -836,6 +863,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { clientSession.getDetails().setAssociationState(AssociationState.AP_Timeout); clientSession.getDetails().setLastEventTimestamp(apClientEvent.getTimestampMs()); clientSession = clientServiceInterface.updateSession(clientSession); + realtimeEventPublisher.publishClientTimeoutEvent(customerId, equipmentId, locationId, apEventClientSession.getClientTimeoutEvent()); } void populateApNodeMetrics(List metricRecordList, Report report, int customerId, long equipmentId, long locationId) { @@ -844,7 +872,6 @@ public class MqttStatsPublisher implements StatsPublisherInterface { ServiceMetric smr = new ServiceMetric(customerId, equipmentId); smr.setLocationId(locationId); - metricRecordList.add(smr); smr.setDetails(apNodeMetrics); @@ -894,7 +921,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { double usedMemory = deviceReport.getMemUtil().getMemUsed(); double totalMemory = deviceReport.getMemUtil().getMemTotal(); - if (usedMemory/totalMemory * 100 > memoryUtilThresholdPct) { + if (usedMemory / totalMemory * 100 > memoryUtilThresholdPct) { raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization, deviceReport.getTimestampMs()); } else { // Clear any existing cpuUtilization alarms @@ -1144,13 +1171,15 @@ public class MqttStatsPublisher implements StatsPublisherInterface { updateDeviceStatusRadioUtilizationReport(customerId, equipmentId, radioUtilizationReport); } + @Async void clearDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode) { alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode)).stream().forEach(a -> { Alarm alarm = alarmServiceInterface.delete(customerId, equipmentId, a.getAlarmCode(), a.getCreatedTimestamp()); - LOG.info("Cleared device threshold alarm {}", alarm); + LOG.debug("Cleared device threshold alarm {}", alarm); }); } - + + @Async void raiseDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode, long timestampMs) { // Raise an alarm for temperature Alarm alarm = new Alarm(); @@ -1170,9 +1199,9 @@ public class MqttStatsPublisher implements StatsPublisherInterface { if (alarms.isEmpty()) { alarm.setCreatedTimestamp(timestampMs); alarm = alarmServiceInterface.create(alarm); - } + } } - + private void checkIfOutOfBound(String checkedType, int checkedValue, Survey survey, int totalDurationMs, int busyTx, int busyRx, int busy, int busySelf) { if (checkedValue > 100 || checkedValue < 0) { LOG.warn( @@ -1186,7 +1215,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { private void updateNetworkAdminStatusReport(int customerId, long equipmentId, ApNodeMetrics apNodeMetrics) { apNodeMetrics.getNetworkProbeMetrics().forEach(n -> { - LOG.info("Update NetworkAdminStatusReport for NetworkProbeMetrics {}", n.toString()); + LOG.debug("Update NetworkAdminStatusReport for NetworkProbeMetrics {}", n.toString()); Status networkAdminStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.NETWORK_ADMIN); @@ -1202,7 +1231,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { NetworkAdminStatusData statusData = (NetworkAdminStatusData) networkAdminStatus.getDetails(); if (n.getDnsState() == null) { - LOG.debug("No DnsState present in networkProbeMetrics, DnsState and CloudLinkStatus set to 'normal"); + LOG.trace("No DnsState present in networkProbeMetrics, DnsState and CloudLinkStatus set to 'normal"); statusData.setDnsStatus(StatusCode.normal); statusData.setCloudLinkStatus(StatusCode.normal); } else { @@ -1210,13 +1239,13 @@ public class MqttStatsPublisher implements StatsPublisherInterface { statusData.setCloudLinkStatus(stateUpDownErrorToStatusCode(n.getDnsState())); } if (n.getDhcpState() == null) { - LOG.debug("No DhcpState present in networkProbeMetrics, set to 'normal"); + LOG.trace("No DhcpState present in networkProbeMetrics, set to 'normal"); statusData.setDhcpStatus(StatusCode.normal); } else { statusData.setDhcpStatus(stateUpDownErrorToStatusCode(n.getDhcpState())); } if (n.getRadiusState() == null) { - LOG.debug("No RadiusState present in networkProbeMetrics, set to 'normal"); + LOG.trace("No RadiusState present in networkProbeMetrics, set to 'normal"); statusData.setRadiusStatus(StatusCode.normal); } else { statusData.setRadiusStatus(stateUpDownErrorToStatusCode(n.getRadiusState())); @@ -1251,9 +1280,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { void updateDeviceStatusRadioUtilizationReport(int customerId, long equipmentId, RadioUtilizationReport radioUtilizationReport) { LOG.info("Processing updateDeviceStatusRadioUtilizationReport for equipmentId {} with RadioUtilizationReport {}", equipmentId, radioUtilizationReport); - Status radioUtilizationStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.RADIO_UTILIZATION); - if (radioUtilizationStatus == null) { LOG.debug("Create new radioUtilizationStatus"); radioUtilizationStatus = new Status(); @@ -1261,9 +1288,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { radioUtilizationStatus.setEquipmentId(equipmentId); radioUtilizationStatus.setStatusDataType(StatusDataType.RADIO_UTILIZATION); } - radioUtilizationStatus.setDetails(radioUtilizationReport); - statusServiceInterface.update(radioUtilizationStatus); } @@ -1271,32 +1296,23 @@ public class MqttStatsPublisher implements StatsPublisherInterface { List networkProbeMetricsList = new ArrayList<>(); for (NetworkProbe networkProbe : report.getNetworkProbeList()) { - NetworkProbeMetrics networkProbeMetrics = new NetworkProbeMetrics(); - networkProbeMetrics.setSourceTimestampMs(networkProbe.getTimestampMs()); - List dnsProbeResults = new ArrayList<>(); if (networkProbe.hasDnsProbe()) { - DNSProbeMetric dnsProbeMetricFromAp = networkProbe.getDnsProbe(); - DnsProbeMetric cloudDnsProbeMetric = new DnsProbeMetric(); - if (dnsProbeMetricFromAp.hasLatency()) { networkProbeMetrics.setDnsLatencyMs(dnsProbeMetricFromAp.getLatency()); cloudDnsProbeMetric.setDnsLatencyMs(dnsProbeMetricFromAp.getLatency()); } - if (dnsProbeMetricFromAp.hasState()) { StateUpDownError dnsState = OvsdbToWlanCloudTypeMappingUtility.getCloudMetricsStateFromOpensyncStatsStateUpDown(dnsProbeMetricFromAp.getState()); networkProbeMetrics.setDnsState(dnsState); cloudDnsProbeMetric.setDnsState(dnsState); - } - if (dnsProbeMetricFromAp.hasServerIP()) { InetAddress ipAddress; try { @@ -1306,9 +1322,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { LOG.error("Could not get DNS Server IP from network_probe service_metrics_collection_config", e); } } - dnsProbeResults.add(cloudDnsProbeMetric); - } networkProbeMetrics.setDnsProbeResults(dnsProbeResults); @@ -1317,15 +1331,11 @@ public class MqttStatsPublisher implements StatsPublisherInterface { if (radiusMetrics.hasLatency()) { networkProbeMetrics.setRadiusLatencyInMs(radiusMetrics.getLatency()); } - if (radiusMetrics.hasRadiusState()) { StateUpDownError radiusState = OvsdbToWlanCloudTypeMappingUtility.getCloudMetricsStateFromOpensyncStatsStateUpDown(radiusMetrics.getRadiusState()); - networkProbeMetrics.setRadiusState(radiusState); - } - } if (networkProbe.hasVlanProbe()) { @@ -1344,9 +1354,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { networkProbeMetrics.setDhcpLatencyMs(vlanMetrics.getLatency()); } } - networkProbeMetricsList.add(networkProbeMetrics); - } apNodeMetrics.setNetworkProbeMetrics(networkProbeMetricsList); @@ -1364,11 +1372,9 @@ public class MqttStatsPublisher implements StatsPublisherInterface { eqOsPerformance.setTotalAvailableMemoryKb(deviceReport.getMemUtil().getMemTotal()); status.setDetails(eqOsPerformance); status = statusServiceInterface.update(status); - LOG.debug("updated status {}", status); + LOG.trace("updated status {}", status); } - - void populateApClientMetrics(List metricRecordList, Report report, int customerId, long equipmentId, long locationId) { LOG.info("populateApClientMetrics for Customer {} Equipment {}", customerId, equipmentId); @@ -1376,7 +1382,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { for (Client cl : clReport.getClientListList()) { if (cl.getMacAddress() == null) { - LOG.debug("No mac address for Client {}, cannot set device mac address for client in ClientMetrics.", cl); + LOG.trace("No mac address for Client {}, cannot set device mac address for client in ClientMetrics.", cl); continue; } @@ -1527,91 +1533,6 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } } - ClientSession handleClientSessionMetricsUpdate(int customerId, long equipmentId, long locationId, RadioType radioType, long timestamp, - sts.OpensyncStats.Client client) { - try - - { - LOG.info("handleClientSessionUpdate for customerId {} equipmentId {} locationId {} client {} ", customerId, equipmentId, locationId, - client.getMacAddress()); - - ClientSession clientSession = clientServiceInterface.getSessionOrNull(customerId, equipmentId, MacAddress.valueOf(client.getMacAddress())); - - if (clientSession == null) { - LOG.warn("Cannot get client session for {}", client.getMacAddress()); - return null; - } - - ClientSessionDetails latestClientSessionDetails = clientSession.getDetails(); - latestClientSessionDetails.setMetricDetails(calculateClientSessionMetricDetails(client, timestamp)); - - clientSession.getDetails().mergeSession(latestClientSessionDetails); - clientSession.setLastModifiedTimestamp(timestamp); - - clientSession = clientServiceInterface.updateSession(clientSession); - - LOG.debug("Updated client session {}", clientSession); - - return clientSession; - } catch (Exception e) { - LOG.error("Error while attempting to create ClientSession and Info", e); - } - return null; - } - - ClientSessionMetricDetails calculateClientSessionMetricDetails(sts.OpensyncStats.Client client, long timestamp) { - - LOG.info("calculateClientSessionMetricDetails for Client {} at timestamp {}", client.getMacAddress(), timestamp); - - ClientSessionMetricDetails metricDetails = new ClientSessionMetricDetails(); - - if (LOG.isDebugEnabled()) - LOG.debug("Stats: {} DurationMs {}", client.getStats(), client.getDurationMs()); - int rssi = client.getStats().getRssi(); - metricDetails.setRssi(rssi); - metricDetails.setRxBytes(client.getStats().getRxBytes()); - metricDetails.setTxBytes(client.getStats().getTxBytes()); - - // Frames : data chunk sent over data-link layer (Ethernet, ATM) - // Packets : data chunk sent over IP layer. - // in Wifi, these are the same size, so number of packets is equal to - // number of frames - metricDetails.setTotalTxPackets(client.getStats().getTxFrames()); - metricDetails.setTotalRxPackets(client.getStats().getRxFrames()); - metricDetails.setTxDataFrames((int) client.getStats().getTxFrames()); - metricDetails.setRxDataFrames((int) client.getStats().getRxFrames()); - - metricDetails.setRxRateKbps((long) client.getStats().getRxRate()); - metricDetails.setTxRateKbps((long) client.getStats().getTxRate()); - if (LOG.isDebugEnabled()) - LOG.debug("RxRateKbps {} TxRateKbps {}", metricDetails.getRxRateKbps(), metricDetails.getTxRateKbps()); - - // Throughput, do rate / duration - if (client.getDurationMs() > 1000) { - int durationSec = client.getDurationMs() / 1000; - // 1 Mbit = 125000 B - - float rxBytesFv = Long.valueOf(client.getStats().getRxBytes()).floatValue(); - float rxBytesToMb = rxBytesFv / 125000F; - float txBytesFv = Long.valueOf(client.getStats().getTxBytes()).floatValue(); - float txBytesToMb = txBytesFv / 125000F; - - if (LOG.isDebugEnabled()) - LOG.debug("rxBytesToMb {} txBytesToMb {} ", rxBytesToMb, txBytesToMb); - - metricDetails.setRxMbps(rxBytesToMb / durationSec); - metricDetails.setTxMbps(txBytesToMb / durationSec); - if (LOG.isDebugEnabled()) - LOG.debug("RxMbps {} TxMbps {} ", metricDetails.getRxMbps(), metricDetails.getTxMbps()); - - } else { - LOG.warn("Cannot calculate tx/rx throughput for Client {} based on duration of {} Ms", client.getMacAddress(), client.getDurationMs()); - } - metricDetails.setLastMetricTimestamp(timestamp); - - return metricDetails; - } - void populateApSsidMetrics(List metricRecordList, Report report, int customerId, long equipmentId, String apId, long locationId) { LOG.info("populateApSsidMetrics for Customer {} Equipment {}", customerId, equipmentId); @@ -1660,7 +1581,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface { } } } - LOG.debug("Client Report Date is {}", new Date(clientReport.getTimestampMs())); + if (LOG.isTraceEnabled()) LOG.trace("Client Report Date is {}", new Date(clientReport.getTimestampMs())); int numConnectedClients = 0; for (Client client : clientReport.getClientListList()) { if (client.hasStats()) { diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/RealtimeEventPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/RealtimeEventPublisher.java index e0852bb..2acb20e 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/RealtimeEventPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/RealtimeEventPublisher.java @@ -8,6 +8,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import com.telecominfraproject.wlan.client.models.events.realtime.ClientConnectSuccessEvent; @@ -89,7 +90,8 @@ public class RealtimeEventPublisher { private ProfileServiceInterface profileServiceInterface; private static final Logger LOG = LoggerFactory.getLogger(RealtimeEventPublisher.class); - + + @Async void publishChannelHopEvents(int customerId, long equipmentId, long locationId, EventReport e) { LOG.info("publishChannelHopEvents for customerId {} equipmentId {}"); @@ -178,6 +180,7 @@ public class RealtimeEventPublisher { } } + @Async void publishClientConnectSuccessEvent(int customerId, long equipmentId, long locationId, ClientConnectEvent clientConnectEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientConnectEvent, customerId, @@ -266,7 +269,7 @@ public class RealtimeEventPublisher { cloudEventDispatcherInterface.publishEvent(clientEvent); } - + @Async void publishClientDisconnectEvent(int customerId, long equipmentId, long locationId, ClientDisconnectEvent clientDisconnectEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientDisconnectEvent, customerId, @@ -316,7 +319,8 @@ public class RealtimeEventPublisher { cloudEventDispatcherInterface.publishEvent(clientEvent); } - + + @Async void publishClientAuthSystemEvent(int customerId, long equipmentId, long locationId, ClientAuthEvent clientAuthEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientAuthEvent, customerId, equipmentId); @@ -340,7 +344,8 @@ public class RealtimeEventPublisher { cloudEventDispatcherInterface.publishEvent(clientEvent); } - + + @Async void publishClientAssocEvent(int customerId, long equipmentId, long locationId, ClientAssocEvent clientAssocEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientAssocEvent, customerId, equipmentId); @@ -390,7 +395,7 @@ public class RealtimeEventPublisher { cloudEventDispatcherInterface.publishEvent(clientEvent); } - + @Async void publishClientFailureEvent(int customerId, long equipmentId, long locationId, ClientFailureEvent clientFailureEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientFailureEvent, customerId, equipmentId); @@ -417,7 +422,7 @@ public class RealtimeEventPublisher { LOG.info("publishing client event {} to cloud", clientEvent); cloudEventDispatcherInterface.publishEvent(clientEvent); } - + @Async void publishClientFirstDataEvent(int customerId, long equipmentId, long locationId, ClientFirstDataEvent clientFirstDataEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientFirstDataEvent, customerId, equipmentId); @@ -444,7 +449,7 @@ public class RealtimeEventPublisher { cloudEventDispatcherInterface.publishEvent(clientEvent); } - + @Async void publishClientIdEvent(int customerId, long equipmentId, long locationId, ClientIdEvent clientIdEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientIdEvent, customerId, equipmentId); @@ -465,7 +470,7 @@ public class RealtimeEventPublisher { LOG.info("publishing client event {} to cloud", clientEvent); cloudEventDispatcherInterface.publishEvent(clientEvent); } - + @Async void publishClientIpEvent(int customerId, long equipmentId, long locationId, ClientIpEvent clientIpEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientIpEvent, customerId, equipmentId); @@ -491,7 +496,7 @@ public class RealtimeEventPublisher { LOG.info("publishing client event {} to cloud", clientEvent); cloudEventDispatcherInterface.publishEvent(clientEvent); } - + @Async void publishClientTimeoutEvent(int customerId, long equipmentId, long locationId, ClientTimeoutEvent clientTimeoutEvent) { LOG.info("Received ClientEvent {} for customerId {} equipmentId {}", clientTimeoutEvent, customerId, @@ -523,6 +528,7 @@ public class RealtimeEventPublisher { } + @Async void publishDhcpTransactionEvents(int customerId, long equipmentId, long locationId, List dhcpTransactionList) { LOG.info("Publish Dhcp Transaction Events for customer {} equipmentId {}", customerId, equipmentId); List dhcpEventsList = new ArrayList<>(); @@ -803,6 +809,7 @@ public class RealtimeEventPublisher { return cloudDhcpEvent; } + @Async void publishSipCallEvents(int customerId, long equipmentId, long locationId, List sipCallReportList) { // only in case it is not there, we will just use the time when we // received the report/event diff --git a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java index 12055f5..fb274f7 100644 --- a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java +++ b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java @@ -5,7 +5,6 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.TimeUnit; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -13,19 +12,9 @@ import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Tracer; -import org.fusesource.mqtt.codec.CONNACK; -import org.fusesource.mqtt.codec.CONNECT; -import org.fusesource.mqtt.codec.DISCONNECT; import org.fusesource.mqtt.codec.MQTTFrame; import org.fusesource.mqtt.codec.PINGREQ; import org.fusesource.mqtt.codec.PINGRESP; -import org.fusesource.mqtt.codec.PUBACK; -import org.fusesource.mqtt.codec.PUBCOMP; -import org.fusesource.mqtt.codec.PUBLISH; -import org.fusesource.mqtt.codec.PUBREC; -import org.fusesource.mqtt.codec.PUBREL; -import org.fusesource.mqtt.codec.SUBACK; -import org.fusesource.mqtt.codec.SUBSCRIBE; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,7 +25,6 @@ import org.springframework.context.event.ContextClosedEvent; import org.springframework.stereotype.Component; import com.google.protobuf.Descriptors; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TypeRegistry; import com.google.protobuf.util.JsonFormat; import com.netflix.servo.DefaultMonitorRegistry; @@ -48,7 +36,6 @@ import com.netflix.servo.monitor.Stopwatch; import com.netflix.servo.monitor.Timer; import com.netflix.servo.tag.TagList; import com.telecominfraproject.wlan.cloudmetrics.CloudMetricsTags; -import com.telecominfraproject.wlan.opensync.external.integration.OpensyncExternalIntegrationInterface; import com.telecominfraproject.wlan.opensync.external.integration.utils.StatsPublisherInterface; import com.telecominfraproject.wlan.opensync.util.ZlibUtil; @@ -59,14 +46,19 @@ import sts.OpensyncStats.Report; @Component public class OpensyncMqttClient implements ApplicationListener { - public static final int REPORT_PROCESSING_THRESHOLD_SEC = 30; + // private static final String METRICS_WKR_PFX = "metrics-wkr-"; + + // public static final int REPORT_PROCESSING_THRESHOLD_SEC = 30; private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class); private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA"); - + private static final Logger MQTT_TRACER_LOG = LoggerFactory.getLogger("MQTT_CLIENT_TRACER"); + // private static final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new + // CustomizableThreadFactory(METRICS_WKR_PFX)); + public static Charset utf8 = Charset.forName("UTF-8"); private final TagList tags = CloudMetricsTags.commonTags; @@ -128,7 +120,7 @@ public class OpensyncMqttClient implements ApplicationListener REPORT_PROCESSING_THRESHOLD_SEC) { - try { - LOG.warn("Processing threshold exceeded for {}. Elapsed processing time {} seconds. MqttMessage: {}",mqttMsg.getTopic().split("/")[2], - elapsedTimeSeconds, jsonPrinter.print(statsReport)); - } catch (InvalidProtocolBufferException e) { - LOG.error("Error parsing Report from protobuffer for Topic {}", mqttMsg.getTopic(), e); - } - } - if (LOG.isTraceEnabled()) - LOG.trace("Completed publishing service metrics from received mqtt stats"); - } - }; - sender.start(); + MQTT_LOG.info("Topic {}\n{}", mqttMsg.getTopic(), jsonPrinter.print(statsReport)); + statsPublisher.processMqttMessage(mqttMsg.getTopic(), statsReport); + LOG.debug("Dispatched report for topic {} to backend for processing", mqttMsg.getTopic()); } catch (Exception e) { String msgStr = new String(mqttMsg.getPayload(), utf8);