MQTT Client analysis and optimization

Signed-off-by: Mike Hansen <mike.hansen@connectus.ai>
This commit is contained in:
Mike Hansen
2021-07-07 13:30:32 -04:00
parent 927ce4e589
commit 785dc0e6c4

View File

@@ -5,6 +5,7 @@ import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTT;
@@ -25,9 +26,9 @@ import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors;
import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.TypeRegistry;
import com.netflix.servo.DefaultMonitorRegistry; import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.monitor.BasicCounter; import com.netflix.servo.monitor.BasicCounter;
import com.netflix.servo.monitor.BasicTimer; import com.netflix.servo.monitor.BasicTimer;
@@ -47,6 +48,8 @@ import sts.OpensyncStats.Report;
@Component @Component
public class OpensyncMqttClient implements ApplicationListener<ContextClosedEvent> { public class OpensyncMqttClient implements ApplicationListener<ContextClosedEvent> {
public static final int REPORT_PROCESSING_THRESHOLD_SEC = 30;
private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class); private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class);
private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA"); private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA");
@@ -112,7 +115,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
while (keepReconnecting) { while (keepReconnecting) {
BlockingConnection blockingConnection = null; BlockingConnection blockingConnection = null;
try { try {
Thread.sleep(5000); // Thread.sleep(5000);
// Create a new MQTT connection to the broker. // Create a new MQTT connection to the broker.
/* /*
@@ -152,10 +155,10 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
case PINGREQ.TYPE: case PINGREQ.TYPE:
case PINGRESP.TYPE: case PINGRESP.TYPE:
// PINGs don't want to fill log // PINGs don't want to fill log
LOG.trace("MQTT Client Received: {}", frame); LOG.trace("OpensyncMqttClient Rx: {}", frame);
break; break;
default: default:
LOG.debug("MQTT Client Received: {}", frame); LOG.debug("OpensyncMqttClient Rx: {}", frame);
} }
} }
@@ -165,31 +168,26 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
case PINGREQ.TYPE: case PINGREQ.TYPE:
case PINGRESP.TYPE: case PINGRESP.TYPE:
// PINGs don't want to fill log // PINGs don't want to fill log
LOG.trace("MQTT Client Sent: {}", frame); LOG.trace("OpensyncMqttClient Tx: {}", frame);
break; break;
default: default:
LOG.debug("MQTT Client Sent: {}", frame); LOG.debug("OpensyncMqttClient Tx: {}", frame);
} }
} }
@Override @Override
public void debug(String message, Object... args) { public void debug(String message, Object... args) {
LOG.debug("MQTT Client debugging messages: {}", String.format(message, args)); LOG.info(String.format(message, args));
} }
}); });
blockingConnection = mqtt.blockingConnection(); blockingConnection = mqtt.blockingConnection();
blockingConnection.connect(); blockingConnection.connect();
LOG.info("Connected to MQTT broker at {}", mqtt.getHost()); LOG.debug("Connected to MQTT broker at {}", mqtt.getHost());
// Subscribe to topics: // NB. setting to AT_MOST_ONCE to match the APs message level
// Topic[] topics = {new Topic("/ap/#", QoS.AT_MOST_ONCE),};
// new Topic("mqtt/example/publish", QoS.AT_LEAST_ONCE),
// new Topic("#", QoS.AT_LEAST_ONCE),
// new Topic("test/#", QoS.EXACTLY_ONCE),
// new Topic("foo/+/bar", QoS.AT_LEAST_ONCE)
Topic[] topics = {new Topic("#", QoS.AT_LEAST_ONCE),};
blockingConnection.subscribe(topics); blockingConnection.subscribe(topics);
LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics)); LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics));
@@ -207,34 +205,50 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
Message mqttMsg = blockingConnection.receive(); Message mqttMsg = blockingConnection.receive();
if (mqttMsg == null) { if (mqttMsg == null) {
if (LOG.isTraceEnabled())
LOG.trace("NULL message received for blocking connection");
continue; continue;
} }
LOG.debug("MQTT Topic {}", mqttMsg.getTopic());
byte payload[] = mqttMsg.getPayload();
messagesReceived.increment();
messageBytesReceived.increment(payload.length);
Stopwatch stopwatchTimerMessageProcess = timerMessageProcess.start(); Stopwatch stopwatchTimerMessageProcess = timerMessageProcess.start();
LOG.trace("received message on topic {} size {}", mqttMsg.getTopic(), payload.length);
if (payload[0] == 0x78) {
// looks like zlib-compressed data, let's
// decompress
// it before deserializing
payload = ZlibUtil.decompress(payload);
}
// attempt to parse the message as protobuf
MessageOrBuilder encodedMsg = null;
try { try {
byte payload[] = mqttMsg.getPayload();
messagesReceived.increment();
messageBytesReceived.increment(payload.length);
if (payload[0] == 0x78) {
// looks like zlib-compressed data, let's
// decompress
// it before deserializing
payload = ZlibUtil.decompress(payload);
}
// Only supported protobuf on the TIP opensync APs is Report // Only supported protobuf on the TIP opensync APs is Report
encodedMsg = Report.parseFrom(payload); Report statsReport = Report.parseFrom(payload);
mqttMsg.ack(); if (LOG.isTraceEnabled())
MQTT_LOG.info("topic = {}\nReport = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg)); LOG.trace("Received opensync stats report for topic = {}\nReport = {}", mqttMsg.getTopic(), statsReport.toString());
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg);
Thread sender = new Thread(statsReport.getNodeID() + "-worker") {
@Override
public void run() {
long startTime = System.nanoTime();
if (LOG.isTraceEnabled())
LOG.trace("Start publishing service metrics from received mqtt stats");
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), statsReport);
mqttMsg.ack();
long elapsedTimeSeconds = TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (elapsedTimeSeconds > REPORT_PROCESSING_THRESHOLD_SEC) {
try {
LOG.warn("Processing threshold exceeded for {}. Elapsed processing time {} seconds. MqttMessage: {}",mqttMsg.getTopic().split("/")[2],
elapsedTimeSeconds, jsonPrinter.print(statsReport));
} catch (InvalidProtocolBufferException e) {
LOG.error("Error parsing Report from protobuffer for Topic {}", mqttMsg.getTopic(), e);
}
}
if (LOG.isTraceEnabled())
LOG.trace("Completed publishing service metrics from received mqtt stats");
}
};
sender.start();
} catch (Exception e) { } catch (Exception e) {
String msgStr = new String(mqttMsg.getPayload(), utf8); String msgStr = new String(mqttMsg.getPayload(), utf8);
LOG.warn("Could not process message topic = {}\nmessage = {}", mqttMsg.getTopic(), msgStr); LOG.warn("Could not process message topic = {}\nmessage = {}", mqttMsg.getTopic(), msgStr);