mirror of
https://github.com/Telecominfraproject/wlan-cloud-opensync-controller.git
synced 2025-11-02 19:47:52 +00:00
MQTT message delivery reliability
- MqttStatsPublisher measure time for overall and per report metrics generation Signed-off-by: Mike Hansen <mike.hansen@connectus.ai>
This commit is contained in:
@@ -5,7 +5,6 @@ import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.fusesource.mqtt.client.BlockingConnection;
|
||||
import org.fusesource.mqtt.client.MQTT;
|
||||
@@ -13,19 +12,9 @@ import org.fusesource.mqtt.client.Message;
|
||||
import org.fusesource.mqtt.client.QoS;
|
||||
import org.fusesource.mqtt.client.Topic;
|
||||
import org.fusesource.mqtt.client.Tracer;
|
||||
import org.fusesource.mqtt.codec.CONNACK;
|
||||
import org.fusesource.mqtt.codec.CONNECT;
|
||||
import org.fusesource.mqtt.codec.DISCONNECT;
|
||||
import org.fusesource.mqtt.codec.MQTTFrame;
|
||||
import org.fusesource.mqtt.codec.PINGREQ;
|
||||
import org.fusesource.mqtt.codec.PINGRESP;
|
||||
import org.fusesource.mqtt.codec.PUBACK;
|
||||
import org.fusesource.mqtt.codec.PUBCOMP;
|
||||
import org.fusesource.mqtt.codec.PUBLISH;
|
||||
import org.fusesource.mqtt.codec.PUBREC;
|
||||
import org.fusesource.mqtt.codec.PUBREL;
|
||||
import org.fusesource.mqtt.codec.SUBACK;
|
||||
import org.fusesource.mqtt.codec.SUBSCRIBE;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -36,7 +25,6 @@ import org.springframework.context.event.ContextClosedEvent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.TypeRegistry;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import com.netflix.servo.DefaultMonitorRegistry;
|
||||
@@ -48,7 +36,6 @@ import com.netflix.servo.monitor.Stopwatch;
|
||||
import com.netflix.servo.monitor.Timer;
|
||||
import com.netflix.servo.tag.TagList;
|
||||
import com.telecominfraproject.wlan.cloudmetrics.CloudMetricsTags;
|
||||
import com.telecominfraproject.wlan.opensync.external.integration.OpensyncExternalIntegrationInterface;
|
||||
import com.telecominfraproject.wlan.opensync.external.integration.utils.StatsPublisherInterface;
|
||||
import com.telecominfraproject.wlan.opensync.util.ZlibUtil;
|
||||
|
||||
@@ -59,14 +46,19 @@ import sts.OpensyncStats.Report;
|
||||
@Component
|
||||
public class OpensyncMqttClient implements ApplicationListener<ContextClosedEvent> {
|
||||
|
||||
public static final int REPORT_PROCESSING_THRESHOLD_SEC = 30;
|
||||
// private static final String METRICS_WKR_PFX = "metrics-wkr-";
|
||||
|
||||
// public static final int REPORT_PROCESSING_THRESHOLD_SEC = 30;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class);
|
||||
|
||||
private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA");
|
||||
|
||||
|
||||
private static final Logger MQTT_TRACER_LOG = LoggerFactory.getLogger("MQTT_CLIENT_TRACER");
|
||||
|
||||
// private static final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new
|
||||
// CustomizableThreadFactory(METRICS_WKR_PFX));
|
||||
|
||||
public static Charset utf8 = Charset.forName("UTF-8");
|
||||
|
||||
private final TagList tags = CloudMetricsTags.commonTags;
|
||||
@@ -128,7 +120,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
|
||||
while (keepReconnecting) {
|
||||
BlockingConnection blockingConnection = null;
|
||||
try {
|
||||
// Thread.sleep(5000);
|
||||
// Thread.sleep(5000);
|
||||
|
||||
// Create a new MQTT connection to the broker.
|
||||
/*
|
||||
@@ -169,17 +161,8 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
|
||||
// PINGs don't want to fill log
|
||||
MQTT_TRACER_LOG.trace("OpensyncMqttClient Rx: {}", frame);
|
||||
break;
|
||||
case SUBACK.TYPE:
|
||||
case CONNACK.TYPE:
|
||||
case PUBACK.TYPE:
|
||||
case PUBCOMP.TYPE:
|
||||
case PUBREC.TYPE:
|
||||
case PUBREL.TYPE:
|
||||
case PUBLISH.TYPE:
|
||||
MQTT_TRACER_LOG.info("OpensyncMqttClient Rx: {}", frame);
|
||||
break;
|
||||
default:
|
||||
MQTT_TRACER_LOG.debug("OpensyncMqttClient Rx: {}", frame);
|
||||
MQTT_TRACER_LOG.info("OpensyncMqttClient Rx: {}", frame);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,22 +172,15 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
|
||||
case PINGREQ.TYPE:
|
||||
// PINGs don't want to fill log
|
||||
MQTT_TRACER_LOG.trace("OpensyncMqttClient Tx: {}", frame);
|
||||
break;
|
||||
case SUBSCRIBE.TYPE:
|
||||
case CONNECT.TYPE:
|
||||
case DISCONNECT.TYPE:
|
||||
case PUBACK.TYPE:
|
||||
case PUBCOMP.TYPE:
|
||||
MQTT_TRACER_LOG.info("OpensyncMqttClient Tx: {}", frame);
|
||||
break;
|
||||
break;
|
||||
default:
|
||||
MQTT_TRACER_LOG.debug("OpensyncMqttClient Tx: {}", frame);
|
||||
MQTT_TRACER_LOG.info("OpensyncMqttClient Tx: {}", frame);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void debug(String message, Object... args) {
|
||||
LOG.info(String.format(message, args));
|
||||
MQTT_TRACER_LOG.info(String.format(message, args));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -229,6 +205,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
|
||||
|
||||
// main loop - receive messages
|
||||
while (true) {
|
||||
LOG.debug("{} awaiting mqtt message from broker",mqttClientThread.getName());
|
||||
Message mqttMsg = blockingConnection.receive();
|
||||
|
||||
if (mqttMsg == null) {
|
||||
@@ -252,30 +229,9 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
|
||||
Report statsReport = Report.parseFrom(payload);
|
||||
mqttMsg.ack();
|
||||
|
||||
if (LOG.isTraceEnabled())
|
||||
LOG.trace("Received opensync stats report for topic = {}\nReport = {}", mqttMsg.getTopic(), statsReport.toString());
|
||||
MQTT_LOG.info("Topic {}\n{}",mqttMsg.getTopic(), jsonPrinter.print(statsReport));
|
||||
Thread sender = new Thread(mqttMsg.getTopic().split("/")[2] + "-worker") {
|
||||
@Override
|
||||
public void run() {
|
||||
long startTime = System.nanoTime();
|
||||
if (LOG.isTraceEnabled())
|
||||
LOG.trace("Start publishing service metrics from received mqtt stats");
|
||||
statsPublisher.processMqttMessage(mqttMsg.getTopic(), statsReport);
|
||||
long elapsedTimeSeconds = TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
|
||||
if (elapsedTimeSeconds > REPORT_PROCESSING_THRESHOLD_SEC) {
|
||||
try {
|
||||
LOG.warn("Processing threshold exceeded for {}. Elapsed processing time {} seconds. MqttMessage: {}",mqttMsg.getTopic().split("/")[2],
|
||||
elapsedTimeSeconds, jsonPrinter.print(statsReport));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
LOG.error("Error parsing Report from protobuffer for Topic {}", mqttMsg.getTopic(), e);
|
||||
}
|
||||
}
|
||||
if (LOG.isTraceEnabled())
|
||||
LOG.trace("Completed publishing service metrics from received mqtt stats");
|
||||
}
|
||||
};
|
||||
sender.start();
|
||||
MQTT_LOG.info("Topic {}\n{}", mqttMsg.getTopic(), jsonPrinter.print(statsReport));
|
||||
statsPublisher.processMqttMessage(mqttMsg.getTopic(), statsReport);
|
||||
LOG.debug("Dispatched report for topic {} to backend for processing", mqttMsg.getTopic());
|
||||
|
||||
} catch (Exception e) {
|
||||
String msgStr = new String(mqttMsg.getPayload(), utf8);
|
||||
|
||||
Reference in New Issue
Block a user