mirror of
https://github.com/Telecominfraproject/wlan-cloud-opensync-controller.git
synced 2025-11-01 11:07:49 +00:00
Move the raising and clearing of the threshold alarms into the gateway controller when the device information is received in the MQTT device report. Update EquipmentAdminState alarms or clear if required.
Signed-off-by: Mike Hansen <mike.hansen@connectus.ai>
This commit is contained in:
@@ -6,6 +6,7 @@ import java.net.Inet6Address;
|
|||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@@ -16,6 +17,7 @@ import java.util.Set;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
@@ -23,6 +25,12 @@ import com.google.protobuf.Descriptors;
|
|||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import com.google.protobuf.util.JsonFormat;
|
import com.google.protobuf.util.JsonFormat;
|
||||||
import com.google.protobuf.util.JsonFormat.TypeRegistry;
|
import com.google.protobuf.util.JsonFormat.TypeRegistry;
|
||||||
|
import com.telecominfraproject.wlan.alarm.AlarmServiceInterface;
|
||||||
|
import com.telecominfraproject.wlan.alarm.models.Alarm;
|
||||||
|
import com.telecominfraproject.wlan.alarm.models.AlarmCode;
|
||||||
|
import com.telecominfraproject.wlan.alarm.models.AlarmDetails;
|
||||||
|
import com.telecominfraproject.wlan.alarm.models.AlarmScopeType;
|
||||||
|
import com.telecominfraproject.wlan.alarm.models.OriginatorType;
|
||||||
import com.telecominfraproject.wlan.client.ClientServiceInterface;
|
import com.telecominfraproject.wlan.client.ClientServiceInterface;
|
||||||
import com.telecominfraproject.wlan.client.info.models.ClientInfoDetails;
|
import com.telecominfraproject.wlan.client.info.models.ClientInfoDetails;
|
||||||
import com.telecominfraproject.wlan.client.session.models.AssociationState;
|
import com.telecominfraproject.wlan.client.session.models.AssociationState;
|
||||||
@@ -68,6 +76,7 @@ import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric;
|
|||||||
import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourReport;
|
import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourReport;
|
||||||
import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourScanReports;
|
import com.telecominfraproject.wlan.servicemetric.neighbourscan.models.NeighbourScanReports;
|
||||||
import com.telecominfraproject.wlan.status.StatusServiceInterface;
|
import com.telecominfraproject.wlan.status.StatusServiceInterface;
|
||||||
|
import com.telecominfraproject.wlan.status.equipment.models.EquipmentAdminStatusData;
|
||||||
import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSID;
|
import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSID;
|
||||||
import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSIDs;
|
import com.telecominfraproject.wlan.status.equipment.report.models.ActiveBSSIDs;
|
||||||
import com.telecominfraproject.wlan.status.equipment.report.models.EquipmentCapacityDetails;
|
import com.telecominfraproject.wlan.status.equipment.report.models.EquipmentCapacityDetails;
|
||||||
@@ -133,6 +142,17 @@ public class MqttStatsPublisher {
|
|||||||
private CloudEventDispatcherInterface cloudEventDispatcherInterface;
|
private CloudEventDispatcherInterface cloudEventDispatcherInterface;
|
||||||
@Autowired
|
@Autowired
|
||||||
private RealtimeEventPublisher realtimeEventPublisher;
|
private RealtimeEventPublisher realtimeEventPublisher;
|
||||||
|
@Autowired
|
||||||
|
private AlarmServiceInterface alarmServiceInterface;
|
||||||
|
|
||||||
|
@Value("${tip.wlan.mqttStatsPublisher.temperatureThresholdInC:80}")
|
||||||
|
private int temperatureThresholdInC;
|
||||||
|
|
||||||
|
@Value("${tip.wlan.mqttStatsPublisher.cpuUtilThresholdPct:80}")
|
||||||
|
private int cpuUtilThresholdPct;
|
||||||
|
|
||||||
|
@Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}")
|
||||||
|
private int memoryUtilThresholdPct;
|
||||||
|
|
||||||
public void processMqttMessage(String topic, WCStatsReport wcStatsReport) {
|
public void processMqttMessage(String topic, WCStatsReport wcStatsReport) {
|
||||||
LOG.info("Received WCStatsReport {}", wcStatsReport.toString());
|
LOG.info("Received WCStatsReport {}", wcStatsReport.toString());
|
||||||
@@ -222,9 +242,12 @@ public class MqttStatsPublisher {
|
|||||||
LOG.debug("Current timestamp for service metrics is {}", serviceMetricTimestamp);
|
LOG.debug("Current timestamp for service metrics is {}", serviceMetricTimestamp);
|
||||||
metricRecordList.stream().forEach(smr -> {
|
metricRecordList.stream().forEach(smr -> {
|
||||||
smr.setCreatedTimestamp(serviceMetricTimestamp);
|
smr.setCreatedTimestamp(serviceMetricTimestamp);
|
||||||
if (smr.getLocationId() == 0) smr.setLocationId(locationId);
|
if (smr.getLocationId() == 0)
|
||||||
if(smr.getCustomerId() == 0) smr.setCustomerId(customerId);
|
smr.setLocationId(locationId);
|
||||||
if (smr.getEquipmentId() == 0L) smr.setEquipmentId(equipmentId);
|
if (smr.getCustomerId() == 0)
|
||||||
|
smr.setCustomerId(customerId);
|
||||||
|
if (smr.getEquipmentId() == 0L)
|
||||||
|
smr.setEquipmentId(equipmentId);
|
||||||
});
|
});
|
||||||
metricRecordList.stream().forEach(smr -> {
|
metricRecordList.stream().forEach(smr -> {
|
||||||
LOG.debug("ServiceMetric {}", smr);
|
LOG.debug("ServiceMetric {}", smr);
|
||||||
@@ -938,22 +961,41 @@ public class MqttStatsPublisher {
|
|||||||
numSamples++;
|
numSamples++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numSamples > 0) {
|
if (numSamples > 0) {
|
||||||
avgRadioTemp = Math.round((cpuTemperature / numSamples));
|
avgRadioTemp = Math.round((cpuTemperature / numSamples));
|
||||||
apPerformance.setCpuTemperature(avgRadioTemp);
|
apPerformance.setCpuTemperature(avgRadioTemp);
|
||||||
}
|
}
|
||||||
|
if (avgRadioTemp > temperatureThresholdInC) {
|
||||||
|
raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature, deviceReport.getTimestampMs());
|
||||||
|
} else {
|
||||||
|
// Clear any existing temperature alarms for this ap
|
||||||
|
clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUTemperature);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (deviceReport.hasCpuUtil() && deviceReport.getCpuUtil().hasCpuUtil()) {
|
if (deviceReport.hasCpuUtil() && deviceReport.getCpuUtil().hasCpuUtil()) {
|
||||||
Integer cpuUtilization = deviceReport.getCpuUtil().getCpuUtil();
|
Integer cpuUtilization = deviceReport.getCpuUtil().getCpuUtil();
|
||||||
apPerformance.setCpuUtilized(new int[] {cpuUtilization});
|
apPerformance.setCpuUtilized(new int[] {cpuUtilization});
|
||||||
|
if (cpuUtilization > cpuUtilThresholdPct) {
|
||||||
|
raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization, deviceReport.getTimestampMs());
|
||||||
|
} else {
|
||||||
|
// Clear any existing cpuUtilization alarms
|
||||||
|
clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.CPUUtilization);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
apPerformance.setEthLinkState(EthernetLinkState.UP1000_FULL_DUPLEX);
|
apPerformance.setEthLinkState(EthernetLinkState.UP1000_FULL_DUPLEX);
|
||||||
|
|
||||||
if (deviceReport.hasMemUtil() && deviceReport.getMemUtil().hasMemTotal() && deviceReport.getMemUtil().hasMemUsed()) {
|
if (deviceReport.hasMemUtil() && deviceReport.getMemUtil().hasMemTotal() && deviceReport.getMemUtil().hasMemUsed()) {
|
||||||
apPerformance.setFreeMemory(deviceReport.getMemUtil().getMemTotal() - deviceReport.getMemUtil().getMemUsed());
|
apPerformance.setFreeMemory(deviceReport.getMemUtil().getMemTotal() - deviceReport.getMemUtil().getMemUsed());
|
||||||
|
|
||||||
|
if (deviceReport.getMemUtil().getMemUsed()/deviceReport.getMemUtil().getMemTotal() > memoryUtilThresholdPct) {
|
||||||
|
raiseDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization, deviceReport.getTimestampMs());
|
||||||
|
} else {
|
||||||
|
// Clear any existing cpuUtilization alarms
|
||||||
|
clearDeviceThresholdAlarm(customerId, equipmentId, AlarmCode.MemoryUtilization);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
apPerformance.setUpTime((long) deviceReport.getUptime());
|
apPerformance.setUpTime((long) deviceReport.getUptime());
|
||||||
|
|
||||||
@@ -969,7 +1011,6 @@ public class MqttStatsPublisher {
|
|||||||
if (apNodeMetrics.getSourceTimestampMs() < deviceReport.getTimestampMs())
|
if (apNodeMetrics.getSourceTimestampMs() < deviceReport.getTimestampMs())
|
||||||
apNodeMetrics.setSourceTimestampMs(deviceReport.getTimestampMs());
|
apNodeMetrics.setSourceTimestampMs(deviceReport.getTimestampMs());
|
||||||
updateDeviceStatusForReport(customerId, equipmentId, deviceReport, avgRadioTemp);
|
updateDeviceStatusForReport(customerId, equipmentId, deviceReport, avgRadioTemp);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// statusList.add(status);
|
// statusList.add(status);
|
||||||
@@ -1133,7 +1174,8 @@ public class MqttStatsPublisher {
|
|||||||
radioUtil.setTimestampSeconds((int) ((survey.getTimestampMs()) / 1000));
|
radioUtil.setTimestampSeconds((int) ((survey.getTimestampMs()) / 1000));
|
||||||
radioUtil.setSourceTimestampMs(survey.getTimestampMs());
|
radioUtil.setSourceTimestampMs(survey.getTimestampMs());
|
||||||
|
|
||||||
// The service metric report's sourceTimestamp will be the most recent timestamp from its contributing stats
|
// The service metric report's sourceTimestamp will be the most recent timestamp from its
|
||||||
|
// contributing stats
|
||||||
if (apNodeMetrics.getSourceTimestampMs() < survey.getTimestampMs())
|
if (apNodeMetrics.getSourceTimestampMs() < survey.getTimestampMs())
|
||||||
apNodeMetrics.setSourceTimestampMs(survey.getTimestampMs());
|
apNodeMetrics.setSourceTimestampMs(survey.getTimestampMs());
|
||||||
|
|
||||||
@@ -1197,6 +1239,90 @@ public class MqttStatsPublisher {
|
|||||||
updateDeviceStatusRadioUtilizationReport(customerId, equipmentId, radioUtilizationReport);
|
updateDeviceStatusRadioUtilizationReport(customerId, equipmentId, radioUtilizationReport);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void clearDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode) {
|
||||||
|
alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode)).stream().forEach(a -> {
|
||||||
|
alarmServiceInterface.delete(customerId, equipmentId, a.getAlarmCode(), a.getLastModifiedTimestamp());
|
||||||
|
Status equipmentAdminStatus = statusServiceInterface.getOrNull(customerId, equipmentId, StatusDataType.EQUIPMENT_ADMIN);
|
||||||
|
if (equipmentAdminStatus == null) {
|
||||||
|
equipmentAdminStatus = new Status();
|
||||||
|
equipmentAdminStatus.setCustomerId(customerId);
|
||||||
|
equipmentAdminStatus.setEquipmentId(equipmentId);
|
||||||
|
equipmentAdminStatus.setStatusDataType(StatusDataType.EQUIPMENT_ADMIN);
|
||||||
|
EquipmentAdminStatusData equipmentAdminStatusData = new EquipmentAdminStatusData();
|
||||||
|
equipmentAdminStatusData.setAlarmTimestamps(new HashMap<>());
|
||||||
|
equipmentAdminStatusData.setStatusCode(StatusCode.normal);
|
||||||
|
equipmentAdminStatusData.setStatusMessage(StatusCode.normal.name());
|
||||||
|
equipmentAdminStatus.setDetails(equipmentAdminStatusData);
|
||||||
|
equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus);
|
||||||
|
} else {
|
||||||
|
EquipmentAdminStatusData equipmentAdminStatusData = (EquipmentAdminStatusData)equipmentAdminStatus.getDetails();
|
||||||
|
if (equipmentAdminStatusData.getAlarmTimestamps().containsKey(alarmCode.name())) {
|
||||||
|
equipmentAdminStatusData.getAlarmTimestamps().remove(alarmCode.name());
|
||||||
|
if (equipmentAdminStatusData.getAlarmTimestamps().isEmpty()) {
|
||||||
|
equipmentAdminStatusData.setStatusCode(StatusCode.normal);
|
||||||
|
equipmentAdminStatusData.setStatusMessage(StatusCode.normal.name());
|
||||||
|
}
|
||||||
|
equipmentAdminStatus.setDetails(equipmentAdminStatusData);
|
||||||
|
equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void raiseDeviceThresholdAlarm(int customerId, long equipmentId, AlarmCode alarmCode, long timestampMs) {
|
||||||
|
// Raise an alarm for temperature
|
||||||
|
Alarm alarm = new Alarm();
|
||||||
|
alarm.setCustomerId(customerId);
|
||||||
|
alarm.setEquipmentId(equipmentId);
|
||||||
|
alarm.setAlarmCode(alarmCode);
|
||||||
|
alarm.setOriginatorType(OriginatorType.AP);
|
||||||
|
alarm.setSeverity(alarmCode.getSeverity());
|
||||||
|
alarm.setScopeType(AlarmScopeType.EQUIPMENT);
|
||||||
|
alarm.setScopeId("" + equipmentId);
|
||||||
|
AlarmDetails alarmDetails = new AlarmDetails();
|
||||||
|
alarmDetails.setMessage(alarmCode.getDescription());
|
||||||
|
alarmDetails.setAffectedEquipmentIds(Collections.singletonList(equipmentId));
|
||||||
|
alarm.setDetails(alarmDetails);
|
||||||
|
|
||||||
|
List<Alarm> alarms = alarmServiceInterface.get(customerId, Set.of(equipmentId), Set.of(alarmCode));
|
||||||
|
if (alarms.isEmpty()) {
|
||||||
|
alarm.setCreatedTimestamp(timestampMs);
|
||||||
|
alarm = alarmServiceInterface.create(alarm);
|
||||||
|
} else {
|
||||||
|
alarm.setCreatedTimestamp(alarms.iterator().next().getCreatedTimestamp());
|
||||||
|
alarm.setLastModifiedTimestamp(alarms.iterator().next().getLastModifiedTimestamp());
|
||||||
|
alarm = alarmServiceInterface.update(alarm);
|
||||||
|
}
|
||||||
|
updateEquipmentAdminStateForReportedAlarms(customerId, equipmentId, alarm.getLastModifiedTimestamp(), alarmCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateEquipmentAdminStateForReportedAlarms(int customerId, long equipmentId, long timestampMs, AlarmCode alarmCode) {
|
||||||
|
LOG.debug("updateEquipmentAdminStatusForReportedAlarms for customer {} equipmentId {} timestamp {} alarm {}", customerId, equipmentId, timestampMs, alarmCode);
|
||||||
|
|
||||||
|
Status equipmentAdminStatus = statusServiceInterface.getOrNull(customerId,equipmentId, StatusDataType.EQUIPMENT_ADMIN);
|
||||||
|
if (equipmentAdminStatus == null) {
|
||||||
|
equipmentAdminStatus = new Status();
|
||||||
|
equipmentAdminStatus.setCustomerId(customerId);
|
||||||
|
equipmentAdminStatus.setEquipmentId(equipmentId);
|
||||||
|
equipmentAdminStatus.setStatusDataType(StatusDataType.EQUIPMENT_ADMIN);
|
||||||
|
EquipmentAdminStatusData equipmentAdminStatusData = new EquipmentAdminStatusData();
|
||||||
|
equipmentAdminStatusData.setAlarmTimestamps(new HashMap<String,Long>());
|
||||||
|
equipmentAdminStatus.setDetails(equipmentAdminStatusData);
|
||||||
|
equipmentAdminStatus.setCreatedTimestamp(timestampMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).putAlarmTimestamp(alarmCode.getName(), timestampMs);
|
||||||
|
if (equipmentAdminStatus.getLastModifiedTimestamp() < timestampMs) {
|
||||||
|
((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).setStatusCode(alarmCode.getSeverity());
|
||||||
|
((EquipmentAdminStatusData) equipmentAdminStatus.getDetails()).setStatusMessage(alarmCode.getDescription());
|
||||||
|
}
|
||||||
|
|
||||||
|
equipmentAdminStatus = statusServiceInterface.update(equipmentAdminStatus);
|
||||||
|
|
||||||
|
LOG.debug("Updated EquipmentAdminStatus for alarm {} to {}", alarmCode, equipmentAdminStatus.toPrettyString());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private void checkIfOutOfBound(String checkedType, int checkedValue, Survey survey, int totalDurationMs, int busyTx, int busyRx, int busy, int busySelf) {
|
private void checkIfOutOfBound(String checkedType, int checkedValue, Survey survey, int totalDurationMs, int busyTx, int busyRx, int busy, int busySelf) {
|
||||||
if (checkedValue > 100 || checkedValue < 0) {
|
if (checkedValue > 100 || checkedValue < 0) {
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
@@ -1391,6 +1517,8 @@ public class MqttStatsPublisher {
|
|||||||
LOG.debug("updated status {}", status);
|
LOG.debug("updated status {}", status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void populateApClientMetrics(List<ServiceMetric> metricRecordList, Report report, int customerId, long equipmentId, long locationId) {
|
void populateApClientMetrics(List<ServiceMetric> metricRecordList, Report report, int customerId, long equipmentId, long locationId) {
|
||||||
LOG.info("populateApClientMetrics for Customer {} Equipment {}", customerId, equipmentId);
|
LOG.info("populateApClientMetrics for Customer {} Equipment {}", customerId, equipmentId);
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import java.util.ArrayList;
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.fusesource.mqtt.client.Future;
|
|
||||||
import org.fusesource.mqtt.client.FutureConnection;
|
import org.fusesource.mqtt.client.FutureConnection;
|
||||||
import org.fusesource.mqtt.client.MQTT;
|
import org.fusesource.mqtt.client.MQTT;
|
||||||
import org.fusesource.mqtt.client.Message;
|
import org.fusesource.mqtt.client.Message;
|
||||||
@@ -161,9 +160,8 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
|
|||||||
|
|
||||||
// TODO: revisit this blocking connection, change it to
|
// TODO: revisit this blocking connection, change it to
|
||||||
// futureConnection
|
// futureConnection
|
||||||
|
|
||||||
futureConnection = mqtt.futureConnection();
|
futureConnection = mqtt.futureConnection();
|
||||||
Future<Void> f1 = futureConnection.connect();
|
futureConnection.connect();
|
||||||
|
|
||||||
LOG.info("Connected to MQTT broker at {}", mqtt.getHost());
|
LOG.info("Connected to MQTT broker at {}", mqtt.getHost());
|
||||||
|
|
||||||
@@ -175,7 +173,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
|
|||||||
// new Topic("foo/+/bar", QoS.AT_LEAST_ONCE)
|
// new Topic("foo/+/bar", QoS.AT_LEAST_ONCE)
|
||||||
Topic[] topics = { new Topic("#", QoS.AT_LEAST_ONCE), };
|
Topic[] topics = { new Topic("#", QoS.AT_LEAST_ONCE), };
|
||||||
|
|
||||||
Future<byte[]> f2 = futureConnection.subscribe(topics);
|
futureConnection.subscribe(topics);
|
||||||
LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics));
|
LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics));
|
||||||
|
|
||||||
// prepare a JSONPrinter to format protobuf messages as
|
// prepare a JSONPrinter to format protobuf messages as
|
||||||
@@ -190,14 +188,27 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
|
|||||||
|
|
||||||
// main loop - receive messages
|
// main loop - receive messages
|
||||||
while (true) {
|
while (true) {
|
||||||
|
Message mqttMsg = futureConnection.receive().await();
|
||||||
|
|
||||||
Future<Message> receive = futureConnection.receive();
|
if (mqttMsg == null) {
|
||||||
Message mqttMsg = receive.await();
|
continue;
|
||||||
mqttMsg.ack();
|
}
|
||||||
|
|
||||||
LOG.debug("MQTT Topic {}", mqttMsg.getTopic());
|
LOG.debug("MQTT Topic {}", mqttMsg.getTopic());
|
||||||
byte payload[] = mqttMsg.getPayload();
|
|
||||||
|
|
||||||
|
byte payload[] = mqttMsg.getPayload();
|
||||||
|
// we acknowledge right after receive because:
|
||||||
|
// a. none of the stats messages are so important
|
||||||
|
// that
|
||||||
|
// we cannot skip one
|
||||||
|
// b. if there's some kind of problem with the
|
||||||
|
// message
|
||||||
|
// (decoding or processing)
|
||||||
|
// - we want to move on as quickly as possible and
|
||||||
|
// not
|
||||||
|
// let it get stuck in the
|
||||||
|
// queue
|
||||||
|
mqttMsg.ack();
|
||||||
|
|
||||||
messagesReceived.increment();
|
messagesReceived.increment();
|
||||||
messageBytesReceived.increment(payload.length);
|
messageBytesReceived.increment(payload.length);
|
||||||
|
|||||||
Reference in New Issue
Block a user