diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index 9d2504f..aefc62e 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -6,6 +6,7 @@ import java.net.Inet6Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -16,6 +17,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.google.protobuf.ByteString; @@ -23,6 +25,12 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat.TypeRegistry; +import com.telecominfraproject.wlan.alarm.AlarmServiceInterface; +import com.telecominfraproject.wlan.alarm.models.Alarm; +import com.telecominfraproject.wlan.alarm.models.AlarmCode; +import com.telecominfraproject.wlan.alarm.models.AlarmDetails; +import com.telecominfraproject.wlan.alarm.models.AlarmScopeType; +import com.telecominfraproject.wlan.alarm.models.OriginatorType; import com.telecominfraproject.wlan.client.ClientServiceInterface; import com.telecominfraproject.wlan.client.info.models.ClientInfoDetails; import com.telecominfraproject.wlan.client.session.models.AssociationState; @@ -68,6 +76,7 @@ import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric; import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourReport; import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourScanReports; import com.telecominfraproject.wlan.status.StatusServiceInterface; +import com.telecominfraproject.wlan.status.equipment.models.EquipmentAdminStatusData; import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSID; import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSIDs; import com.telecominfraproject.wlan.status.equipment.report.models.EquipmentCapacityDetails; @@ -133,6 +142,17 @@ public class MqttStatsPublisher { private CloudEventDispatcherInterface cloudEventDispatcherInterface; @Autowired private RealtimeEventPublisher realtimeEventPublisher; + @Autowired + private AlarmServiceInterface alarmServiceInterface; + + @Value("${tip.wlan.mqttStatsPublisher.temperatureThresholdInC:80}") + private int temperatureThresholdInC; + + @Value("${tip.wlan.mqttStatsPublisher.cpuUtilThresholdPct:80}") + private int cpuUtilThresholdPct; + + @Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}") + private int memoryUtilThresholdPct; public void processMqttMessage(String topic, WCStatsReport wcStatsReport) { LOG.info("Received WCStatsReport {}", wcStatsReport.toString()); @@ -222,16 +242,19 @@ public class MqttStatsPublisher { LOG.debug("Current timestamp for service metrics is {}", serviceMetricTimestamp); metricRecordList.stream().forEach(smr -> { smr.setCreatedTimestamp(serviceMetricTimestamp); - if (smr.getLocationId() == 0) smr.setLocationId(locationId); - if(smr.getCustomerId() == 0) smr.setCustomerId(customerId); - if (smr.getEquipmentId() == 0L) smr.setEquipmentId(equipmentId); + if (smr.getLocationId() == 0) + smr.setLocationId(locationId); + if (smr.getCustomerId() == 0) + smr.setCustomerId(customerId); + if (smr.getEquipmentId() == 0L) + smr.setEquipmentId(equipmentId); }); metricRecordList.stream().forEach(smr -> { LOG.debug("ServiceMetric {}", smr); }); cloudEventDispatcherInterface.publishMetrics(metricRecordList); } - + publishEvents(report, customerId, equipmentId, apId, locationId); // handleRssiMetrics(metricRecordList, report, customerId, // equipmentId, locationId); @@ -938,22 +961,41 @@ public class MqttStatsPublisher { numSamples++; } } - if (numSamples > 0) { avgRadioTemp = Math.round((cpuTemperature / numSamples)); apPerformance.setCpuTemperature(avgRadioTemp); } + if (avgRadioTemp > temperatureThresholdInC) { + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature, deviceReport.getTimestampMs()); + } else { + // Clear any existing temperature alarms for this ap + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature); + } } if (deviceReport.hasCpuUtil() && deviceReport.getCpuUtil().hasCpuUtil()) { Integer cpuUtilization = deviceReport.getCpuUtil().getCpuUtil(); apPerformance.setCpuUtilized(new int[] {cpuUtilization}); + if (cpuUtilization > cpuUtilThresholdPct) { + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization, deviceReport.getTimestampMs()); + } else { + // Clear any existing cpuUtilization alarms + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization); + } } apPerformance.setEthLinkState(EthernetLinkState.UP1000_FULL_DUPLEX); if (deviceReport.hasMemUtil() && deviceReport.getMemUtil().hasMemTotal() && deviceReport.getMemUtil().hasMemUsed()) { apPerformance.setFreeMemory(deviceReport.getMemUtil().getMemTotal() - deviceReport.getMemUtil().getMemUsed()); + + if (deviceReport.getMemUtil().getMemUsed()/deviceReport.getMemUtil().getMemTotal() > memoryUtilThresholdPct) { + raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization, deviceReport.getTimestampMs()); + } else { + // Clear any existing cpuUtilization alarms + clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization); + } + } apPerformance.setUpTime((long) deviceReport.getUptime()); @@ -969,7 +1011,6 @@ public class MqttStatsPublisher { if (apNodeMetrics.getSourceTimestampMs() < deviceReport.getTimestampMs()) apNodeMetrics.setSourceTimestampMs(deviceReport.getTimestampMs()); updateDeviceStatusForReport(customerId, equipmentId, deviceReport, avgRadioTemp); - } // statusList.add(status); @@ -1067,11 +1108,11 @@ public class MqttStatsPublisher { radioStats.setNumRxErr(rxErrors); radioStats.setNumRxRetry(rxRetries); radioStats.setSourceTimestampMs(clReport.getTimestampMs()); - + // The service metric report's sourceTimestamp will be the most recent timestamp from its contributing stats if (apNodeMetrics.getSourceTimestampMs() < clReport.getTimestampMs()) apNodeMetrics.setSourceTimestampMs(clReport.getTimestampMs()); - + apNodeMetrics.setRadioStats(radioType, radioStats); apNodeMetrics.setRxBytes(radioType, rxBytes); @@ -1132,11 +1173,12 @@ public class MqttStatsPublisher { RadioUtilization radioUtil = new RadioUtilization(); radioUtil.setTimestampSeconds((int) ((survey.getTimestampMs()) / 1000)); radioUtil.setSourceTimestampMs(survey.getTimestampMs()); - - // The service metric report's sourceTimestamp will be the most recent timestamp from its contributing stats + + // The service metric report's sourceTimestamp will be the most recent timestamp from its + // contributing stats if (apNodeMetrics.getSourceTimestampMs() < survey.getTimestampMs()) apNodeMetrics.setSourceTimestampMs(survey.getTimestampMs()); - + int pctBusyTx = busyTx / totalDurationMs; checkIfOutOfBound("pctBusyTx", pctBusyTx, survey, totalDurationMs, busyTx, busyRx, busy, busySelf); @@ -1150,7 +1192,7 @@ public class MqttStatsPublisher { int nonWifi = (busy - (busyTx + busyRx)) / totalDurationMs; checkIfOutOfBound("nonWifi", nonWifi, survey, totalDurationMs, busyTx, busyRx, busy, busySelf); radioUtil.setNonWifi(nonWifi); - + int pctOBSSAndSelfErrors = (busyRx - busySelf) / totalDurationMs; checkIfOutOfBound("OBSSAndSelfErrors", pctOBSSAndSelfErrors, survey, totalDurationMs, busyTx, busyRx, busy, busySelf); radioUtil.setUnassocClientRx(pctOBSSAndSelfErrors); @@ -1197,6 +1239,90 @@ public class MqttStatsPublisher { updateDeviceStatusRadioUtilizationReport(customerId, equipmentId, radioUtilizationReport); } + private void clearDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode) { + alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode)).stream().forEach(a -> { + alarmServiceInterface.delete(customerId, equipmentId, a.getAlarmCode(), a.getLastModifiedTimestamp()); + Status equipmentAdminStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.EQUIPMENT_ADMIN); + if (equipmentAdminStatus == null) { + equipmentAdminStatus = new Status(); + equipmentAdminStatus.setCustomerId(customerId); + equipmentAdminStatus.setEquipmentId(equipmentId); + equipmentAdminStatus.setStatusDataType(StatusDataType.EQUIPMENT_ADMIN); + EquipmentAdminStatusData equipmentAdminStatusData = new EquipmentAdminStatusData(); + equipmentAdminStatusData.setAlarmTimestamps(new HashMap<>()); + equipmentAdminStatusData.setStatusCode(StatusCode.normal); + equipmentAdminStatusData.setStatusMessage(StatusCode.normal.name()); + equipmentAdminStatus.setDetails(equipmentAdminStatusData); + equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus); + } else { + EquipmentAdminStatusData equipmentAdminStatusData = (EquipmentAdminStatusData)equipmentAdminStatus.getDetails(); + if (equipmentAdminStatusData.getAlarmTimestamps().containsKey(alarmCode.name())) { + equipmentAdminStatusData.getAlarmTimestamps().remove(alarmCode.name()); + if (equipmentAdminStatusData.getAlarmTimestamps().isEmpty()) { + equipmentAdminStatusData.setStatusCode(StatusCode.normal); + equipmentAdminStatusData.setStatusMessage(StatusCode.normal.name()); + } + equipmentAdminStatus.setDetails(equipmentAdminStatusData); + equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus); + } + } + }); + } + + void raiseDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode, long timestampMs) { + // Raise an alarm for temperature + Alarm alarm = new Alarm(); + alarm.setCustomerId(customerId); + alarm.setEquipmentId(equipmentId); + alarm.setAlarmCode(alarmCode); + alarm.setOriginatorType(OriginatorType.AP); + alarm.setSeverity(alarmCode.getSeverity()); + alarm.setScopeType(AlarmScopeType.EQUIPMENT); + alarm.setScopeId("" + equipmentId); + AlarmDetails alarmDetails = new AlarmDetails(); + alarmDetails.setMessage(alarmCode.getDescription()); + alarmDetails.setAffectedEquipmentIds(Collections.singletonList(equipmentId)); + alarm.setDetails(alarmDetails); + + List alarms = alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode)); + if (alarms.isEmpty()) { + alarm.setCreatedTimestamp(timestampMs); + alarm = alarmServiceInterface.create(alarm); + } else { + alarm.setCreatedTimestamp(alarms.iterator().next().getCreatedTimestamp()); + alarm.setLastModifiedTimestamp(alarms.iterator().next().getLastModifiedTimestamp()); + alarm = alarmServiceInterface.update(alarm); + } + updateEquipmentAdminStateForReportedAlarms(customerId, equipmentId, alarm.getLastModifiedTimestamp(), alarmCode); + } + + void updateEquipmentAdminStateForReportedAlarms(int customerId, long equipmentId, long timestampMs, AlarmCode alarmCode) { + LOG.debug("updateEquipmentAdminStatusForReportedAlarms for customer {} equipmentId {} timestamp {} alarm {}", customerId, equipmentId, timestampMs, alarmCode); + + Status equipmentAdminStatus = statusServiceInterface.getOrNull(customerId,equipmentId, StatusDataType.EQUIPMENT_ADMIN); + if (equipmentAdminStatus == null) { + equipmentAdminStatus = new Status(); + equipmentAdminStatus.setCustomerId(customerId); + equipmentAdminStatus.setEquipmentId(equipmentId); + equipmentAdminStatus.setStatusDataType(StatusDataType.EQUIPMENT_ADMIN); + EquipmentAdminStatusData equipmentAdminStatusData = new EquipmentAdminStatusData(); + equipmentAdminStatusData.setAlarmTimestamps(new HashMap()); + equipmentAdminStatus.setDetails(equipmentAdminStatusData); + equipmentAdminStatus.setCreatedTimestamp(timestampMs); + } + + ((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).putAlarmTimestamp(alarmCode.getName(), timestampMs); + if (equipmentAdminStatus.getLastModifiedTimestamp() < timestampMs) { + ((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).setStatusCode(alarmCode.getSeverity()); + ((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).setStatusMessage(alarmCode.getDescription()); + } + + equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus); + + LOG.debug("Updated EquipmentAdminStatus for alarm {} to {}", alarmCode, equipmentAdminStatus.toPrettyString()); + + } + private void checkIfOutOfBound(String checkedType, int checkedValue, Survey survey, int totalDurationMs, int busyTx, int busyRx, int busy, int busySelf) { if (checkedValue > 100 || checkedValue < 0) { LOG.warn( @@ -1297,7 +1423,7 @@ public class MqttStatsPublisher { for (NetworkProbe networkProbe : report.getNetworkProbeList()) { NetworkProbeMetrics networkProbeMetrics = new NetworkProbeMetrics(); - + networkProbeMetrics.setSourceTimestampMs(networkProbe.getTimestampMs()); List dnsProbeResults = new ArrayList<>(); @@ -1391,6 +1517,8 @@ public class MqttStatsPublisher { LOG.debug("updated status {}", status); } + + void populateApClientMetrics(List metricRecordList, Report report, int customerId, long equipmentId, long locationId) { LOG.info("populateApClientMetrics for Customer {} Equipment {}", customerId, equipmentId); @@ -1651,7 +1779,7 @@ public class MqttStatsPublisher { // The service metric report's sourceTimestamp will be the most recent ClientReport timestamp if (apSsidMetrics.getSourceTimestampMs() < clientReport.getTimestampMs()) apSsidMetrics.setSourceTimestampMs(clientReport.getTimestampMs()); - + long txBytes = 0L; long rxBytes = 0L; long txFrames = 0L; @@ -1707,7 +1835,7 @@ public class MqttStatsPublisher { rxErrors += client.getStats().getRxErrors(); txErrors += client.getStats().getTxErrors(); lastRssi = client.getStats().getRssi(); - + if (client.hasConnected() && client.getConnected() && client.hasMacAddress()) { numConnectedClients += 1; } diff --git a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java index d9c8ca8..150666f 100644 --- a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java +++ b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java @@ -5,7 +5,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.fusesource.mqtt.client.Future; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; @@ -161,9 +160,8 @@ public class OpensyncMqttClient implements ApplicationListener f1 = futureConnection.connect(); + futureConnection.connect(); LOG.info("Connected to MQTT broker at {}", mqtt.getHost()); @@ -175,7 +173,7 @@ public class OpensyncMqttClient implements ApplicationListener f2 = futureConnection.subscribe(topics); + futureConnection.subscribe(topics); LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics)); // prepare a JSONPrinter to format protobuf messages as @@ -190,14 +188,27 @@ public class OpensyncMqttClient implements ApplicationListener receive = futureConnection.receive(); - Message mqttMsg = receive.await(); - mqttMsg.ack(); + Message mqttMsg = futureConnection.receive().await(); + + if (mqttMsg == null) { + continue; + } LOG.debug("MQTT Topic {}", mqttMsg.getTopic()); - byte payload[] = mqttMsg.getPayload(); + byte payload[] = mqttMsg.getPayload(); + // we acknowledge right after receive because: + // a. none of the stats messages are so important + // that + // we cannot skip one + // b. if there's some kind of problem with the + // message + // (decoding or processing) + // - we want to move on as quickly as possible and + // not + // let it get stuck in the + // queue + mqttMsg.ack(); messagesReceived.increment(); messageBytesReceived.increment(payload.length);