From 11adb02fd2681c5270b6f73e5e448285665bee98 Mon Sep 17 00:00:00 2001 From: Mike Hansen Date: Tue, 22 Jun 2021 11:12:59 -0400 Subject: [PATCH] Move the raising and clearing of the threshold alarms into the gateway controller when the device information is received in the MQTT device report. Update EquipmentAdminState alarms or clear if required. Signed-off-by: Mike Hansen --- .../integration/utils/MqttStatsPublisher.java | 158 ++++++++++++++++-- .../opensync/mqtt/OpensyncMqttClient.java | 29 +++- 2 files changed, 163 insertions(+), 24 deletions(-) diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index 9d2504f..aefc62e 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -6,6 +6,7 @@ import java.net.Inet6Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -16,6 +17,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.google.protobuf.ByteString; @@ -23,6 +25,12 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat.TypeRegistry; +import com.telecominfraproject.wlan.alarm.AlarmServiceInterface; +import com.telecominfraproject.wlan.alarm.models.Alarm; +import com.telecominfraproject.wlan.alarm.models.AlarmCode; +import com.telecominfraproject.wlan.alarm.models.AlarmDetails; +import com.telecominfraproject.wlan.alarm.models.AlarmScopeType; +import com.telecominfraproject.wlan.alarm.models.OriginatorType; import com.telecominfraproject.wlan.client.ClientServiceInterface; import com.telecominfraproject.wlan.client.info.models.ClientInfoDetails; import com.telecominfraproject.wlan.client.session.models.AssociationState; @@ -68,6 +76,7 @@ import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric; import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourReport; import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourScanReports; import com.telecominfraproject.wlan.status.StatusServiceInterface; +import com.telecominfraproject.wlan.status.equipment.models.EquipmentAdminStatusData; import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSID; import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSIDs; import com.telecominfraproject.wlan.status.equipment.report.models.EquipmentCapacityDetails; @@ -133,6 +142,17 @@ public class MqttStatsPublisher { private CloudEventDispatcherInterface cloudEventDispatcherInterface; @Autowired private RealtimeEventPublisher realtimeEventPublisher; + @Autowired + private AlarmServiceInterface alarmServiceInterface; + + @Value("${tip.wlan.mqttStatsPublisher.temperatureThresholdInC:80}") + private int temperatureThresholdInC; + + @Value("${tip.wlan.mqttStatsPublisher.cpuUtilThresholdPct:80}") + private int cpuUtilThresholdPct; + + @Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}") + private int memoryUtilThresholdPct; public void processMqttMessage(String topic, WCStatsReport wcStatsReport) { LOG.info("Received WCStatsReport {}", wcStatsReport.toString()); @@ -222,16 +242,19 @@ public class MqttStatsPublisher { LOG.debug("Current timestamp for service metrics is {}", serviceMetricTimestamp); metricRecordList.stream().forEach(smr -> { smr.setCreatedTimestamp(serviceMetricTimestamp); - if (smr.getLocationId() == 0) smr.setLocationId(locationId); - if(smr.getCustomerId() == 0) smr.setCustomerId(customerId); - if (smr.getEquipmentId() == 0L) smr.setEquipmentId(equipmentId); + if (smr.getLocationId() == 0) + smr.setLocationId(locationId); + if (smr.getCustomerId() == 0) + smr.setCustomerId(customerId); + if (smr.getEquipmentId() == 0L) + smr.setEquipmentId(equipmentId); }); metricRecordList.stream().forEach(smr -> { LOG.debug("ServiceMetric {}", smr); }); cloudEventDispatcherInterface.publishMetrics(metricRecordList); } - + publishEvents(report, customerId, equipmentId, apId, locationId); // handleRssiMetrics(metricRecordList, report, customerId, // equipmentId, locationId); @@ -938,22 +961,41 @@ public class MqttStatsPublisher { numSamples++; } } - if (numSamples > 0) { avgRadioTemp = Math.round((cpuTemperature / numSamples)); apPerformance.setCpuTemperature(avgRadioTemp); } + if (avgRadioTemp > temperatureThresholdInC) { + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature, deviceReport.getTimestampMs()); + } else { + // Clear any existing temperature alarms for this ap + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature); + } } if (deviceReport.hasCpuUtil() && deviceReport.getCpuUtil().hasCpuUtil()) { Integer cpuUtilization = deviceReport.getCpuUtil().getCpuUtil(); apPerformance.setCpuUtilized(new int[] {cpuUtilization}); + if (cpuUtilization > cpuUtilThresholdPct) { + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization, deviceReport.getTimestampMs()); + } else { + // Clear any existing cpuUtilization alarms + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization); + } } apPerformance.setEthLinkState(EthernetLinkState.UP1000_FULL_DUPLEX); if (deviceReport.hasMemUtil() && deviceReport.getMemUtil().hasMemTotal() && deviceReport.getMemUtil().hasMemUsed()) { apPerformance.setFreeMemory(deviceReport.getMemUtil().getMemTotal() - deviceReport.getMemUtil().getMemUsed()); + + if (deviceReport.getMemUtil().getMemUsed()/deviceReport.getMemUtil().getMemTotal() > memoryUtilThresholdPct) { + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization, deviceReport.getTimestampMs()); + } else { + // Clear any existing cpuUtilization alarms + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization); + } + } apPerformance.setUpTime((long) deviceReport.getUptime()); @@ -969,7 +1011,6 @@ public class MqttStatsPublisher { if (apNodeMetrics.getSourceTimestampMs() < deviceReport.getTimestampMs()) apNodeMetrics.setSourceTimestampMs(deviceReport.getTimestampMs()); updateDeviceStatusForReport(customerId, equipmentId, deviceReport, avgRadioTemp); - } // statusList.add(status); @@ -1067,11 +1108,11 @@ public class MqttStatsPublisher { radioStats.setNumRxErr(rxErrors); radioStats.setNumRxRetry(rxRetries); radioStats.setSourceTimestampMs(clReport.getTimestampMs()); - + // The service metric report's sourceTimestamp will be the most recent timestamp from its contributing stats if (apNodeMetrics.getSourceTimestampMs() < clReport.getTimestampMs()) apNodeMetrics.setSourceTimestampMs(clReport.getTimestampMs()); - + apNodeMetrics.setRadioStats(radioType, radioStats); apNodeMetrics.setRxBytes(radioType, rxBytes); @@ -1132,11 +1173,12 @@ public class MqttStatsPublisher { RadioUtilization radioUtil = new RadioUtilization(); radioUtil.setTimestampSeconds((int) ((survey.getTimestampMs()) / 1000)); radioUtil.setSourceTimestampMs(survey.getTimestampMs()); - - // The service metric report's sourceTimestamp will be the most recent timestamp from its contributing stats + + // The service metric report's sourceTimestamp will be the most recent timestamp from its + // contributing stats if (apNodeMetrics.getSourceTimestampMs() < survey.getTimestampMs()) apNodeMetrics.setSourceTimestampMs(survey.getTimestampMs()); - + int pctBusyTx = busyTx / totalDurationMs; checkIfOutOfBound("pctBusyTx", pctBusyTx, survey, totalDurationMs, busyTx, busyRx, busy, busySelf); @@ -1150,7 +1192,7 @@ public class MqttStatsPublisher { int nonWifi = (busy - (busyTx + busyRx)) / totalDurationMs; checkIfOutOfBound("nonWifi", nonWifi, survey, totalDurationMs, busyTx, busyRx, busy, busySelf); radioUtil.setNonWifi(nonWifi); - + int pctOBSSAndSelfErrors = (busyRx - busySelf) / totalDurationMs; checkIfOutOfBound("OBSSAndSelfErrors", pctOBSSAndSelfErrors, survey, totalDurationMs, busyTx, busyRx, busy, busySelf); radioUtil.setUnassocClientRx(pctOBSSAndSelfErrors); @@ -1197,6 +1239,90 @@ public class MqttStatsPublisher { updateDeviceStatusRadioUtilizationReport(customerId, equipmentId, radioUtilizationReport); } + private void clearDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode) { + alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode)).stream().forEach(a -> { + alarmServiceInterface.delete(customerId, equipmentId, a.getAlarmCode(), a.getLastModifiedTimestamp()); + Status equipmentAdminStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.EQUIPMENT_ADMIN); + if (equipmentAdminStatus == null) { + equipmentAdminStatus = new Status(); + equipmentAdminStatus.setCustomerId(customerId); + equipmentAdminStatus.setEquipmentId(equipmentId); + equipmentAdminStatus.setStatusDataType(StatusDataType.EQUIPMENT_ADMIN); + EquipmentAdminStatusData equipmentAdminStatusData = new EquipmentAdminStatusData(); + equipmentAdminStatusData.setAlarmTimestamps(new HashMap<>()); + equipmentAdminStatusData.setStatusCode(StatusCode.normal); + equipmentAdminStatusData.setStatusMessage(StatusCode.normal.name()); + equipmentAdminStatus.setDetails(equipmentAdminStatusData); + equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus); + } else { + EquipmentAdminStatusData equipmentAdminStatusData = (EquipmentAdminStatusData)equipmentAdminStatus.getDetails(); + if (equipmentAdminStatusData.getAlarmTimestamps().containsKey(alarmCode.name())) { + equipmentAdminStatusData.getAlarmTimestamps().remove(alarmCode.name()); + if (equipmentAdminStatusData.getAlarmTimestamps().isEmpty()) { + equipmentAdminStatusData.setStatusCode(StatusCode.normal); + equipmentAdminStatusData.setStatusMessage(StatusCode.normal.name()); + } + equipmentAdminStatus.setDetails(equipmentAdminStatusData); + equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus); + } + } + }); + } + + void raiseDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode, long timestampMs) { + // Raise an alarm for temperature + Alarm alarm = new Alarm(); + alarm.setCustomerId(customerId); + alarm.setEquipmentId(equipmentId); + alarm.setAlarmCode(alarmCode); + alarm.setOriginatorType(OriginatorType.AP); + alarm.setSeverity(alarmCode.getSeverity()); + alarm.setScopeType(AlarmScopeType.EQUIPMENT); + alarm.setScopeId("" + equipmentId); + AlarmDetails alarmDetails = new AlarmDetails(); + alarmDetails.setMessage(alarmCode.getDescription()); + alarmDetails.setAffectedEquipmentIds(Collections.singletonList(equipmentId)); + alarm.setDetails(alarmDetails); + + List alarms = alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode)); + if (alarms.isEmpty()) { + alarm.setCreatedTimestamp(timestampMs); + alarm = alarmServiceInterface.create(alarm); + } else { + alarm.setCreatedTimestamp(alarms.iterator().next().getCreatedTimestamp()); + alarm.setLastModifiedTimestamp(alarms.iterator().next().getLastModifiedTimestamp()); + alarm = alarmServiceInterface.update(alarm); + } + updateEquipmentAdminStateForReportedAlarms(customerId, equipmentId, alarm.getLastModifiedTimestamp(), alarmCode); + } + + void updateEquipmentAdminStateForReportedAlarms(int customerId, long equipmentId, long timestampMs, AlarmCode alarmCode) { + LOG.debug("updateEquipmentAdminStatusForReportedAlarms for customer {} equipmentId {} timestamp {} alarm {}", customerId, equipmentId, timestampMs, alarmCode); + + Status equipmentAdminStatus = statusServiceInterface.getOrNull(customerId,equipmentId, StatusDataType.EQUIPMENT_ADMIN); + if (equipmentAdminStatus == null) { + equipmentAdminStatus = new Status(); + equipmentAdminStatus.setCustomerId(customerId); + equipmentAdminStatus.setEquipmentId(equipmentId); + equipmentAdminStatus.setStatusDataType(StatusDataType.EQUIPMENT_ADMIN); + EquipmentAdminStatusData equipmentAdminStatusData = new EquipmentAdminStatusData(); + equipmentAdminStatusData.setAlarmTimestamps(new HashMap()); + equipmentAdminStatus.setDetails(equipmentAdminStatusData); + equipmentAdminStatus.setCreatedTimestamp(timestampMs); + } + + ((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).putAlarmTimestamp(alarmCode.getName(), timestampMs); + if (equipmentAdminStatus.getLastModifiedTimestamp() < timestampMs) { + ((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).setStatusCode(alarmCode.getSeverity()); + ((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).setStatusMessage(alarmCode.getDescription()); + } + + equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus); + + LOG.debug("Updated EquipmentAdminStatus for alarm {} to {}", alarmCode, equipmentAdminStatus.toPrettyString()); + + } + private void checkIfOutOfBound(String checkedType, int checkedValue, Survey survey, int totalDurationMs, int busyTx, int busyRx, int busy, int busySelf) { if (checkedValue > 100 || checkedValue < 0) { LOG.warn( @@ -1297,7 +1423,7 @@ public class MqttStatsPublisher { for (NetworkProbe networkProbe : report.getNetworkProbeList()) { NetworkProbeMetrics networkProbeMetrics = new NetworkProbeMetrics(); - + networkProbeMetrics.setSourceTimestampMs(networkProbe.getTimestampMs()); List dnsProbeResults = new ArrayList<>(); @@ -1391,6 +1517,8 @@ public class MqttStatsPublisher { LOG.debug("updated status {}", status); } + + void populateApClientMetrics(List metricRecordList, Report report, int customerId, long equipmentId, long locationId) { LOG.info("populateApClientMetrics for Customer {} Equipment {}", customerId, equipmentId); @@ -1651,7 +1779,7 @@ public class MqttStatsPublisher { // The service metric report's sourceTimestamp will be the most recent ClientReport timestamp if (apSsidMetrics.getSourceTimestampMs() < clientReport.getTimestampMs()) apSsidMetrics.setSourceTimestampMs(clientReport.getTimestampMs()); - + long txBytes = 0L; long rxBytes = 0L; long txFrames = 0L; @@ -1707,7 +1835,7 @@ public class MqttStatsPublisher { rxErrors += client.getStats().getRxErrors(); txErrors += client.getStats().getTxErrors(); lastRssi = client.getStats().getRssi(); - + if (client.hasConnected() && client.getConnected() && client.hasMacAddress()) { numConnectedClients += 1; } diff --git a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java index d9c8ca8..150666f 100644 --- a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java +++ b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java @@ -5,7 +5,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.fusesource.mqtt.client.Future; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; @@ -161,9 +160,8 @@ public class OpensyncMqttClient implements ApplicationListener f1 = futureConnection.connect(); + futureConnection.connect(); LOG.info("Connected to MQTT broker at {}", mqtt.getHost()); @@ -175,7 +173,7 @@ public class OpensyncMqttClient implements ApplicationListener f2 = futureConnection.subscribe(topics); + futureConnection.subscribe(topics); LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics)); // prepare a JSONPrinter to format protobuf messages as @@ -190,14 +188,27 @@ public class OpensyncMqttClient implements ApplicationListener receive = futureConnection.receive(); - Message mqttMsg = receive.await(); - mqttMsg.ack(); + Message mqttMsg = futureConnection.receive().await(); + + if (mqttMsg == null) { + continue; + } LOG.debug("MQTT Topic {}", mqttMsg.getTopic()); - byte payload[] = mqttMsg.getPayload(); + byte payload[] = mqttMsg.getPayload(); + // we acknowledge right after receive because: + // a. none of the stats messages are so important + // that + // we cannot skip one + // b. if there's some kind of problem with the + // message + // (decoding or processing) + // - we want to move on as quickly as possible and + // not + // let it get stuck in the + // queue + mqttMsg.ack(); messagesReceived.increment(); messageBytesReceived.increment(payload.length);