diff --git a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java index de3ad90..c788afa 100644 --- a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java +++ b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java @@ -5,6 +5,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -25,9 +26,9 @@ import org.springframework.context.event.ContextClosedEvent; import org.springframework.stereotype.Component; import com.google.protobuf.Descriptors; -import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.TypeRegistry; import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.TypeRegistry; import com.netflix.servo.DefaultMonitorRegistry; import com.netflix.servo.monitor.BasicCounter; import com.netflix.servo.monitor.BasicTimer; @@ -47,6 +48,8 @@ import sts.OpensyncStats.Report; @Component public class OpensyncMqttClient implements ApplicationListener { + public static final int REPORT_PROCESSING_THRESHOLD_SEC = 30; + private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class); private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA"); @@ -112,7 +115,7 @@ public class OpensyncMqttClient implements ApplicationListener REPORT_PROCESSING_THRESHOLD_SEC) { + try { + LOG.warn("Processing threshold exceeded for {}. Elapsed processing time {} seconds. MqttMessage: {}",mqttMsg.getTopic().split("/")[2], + elapsedTimeSeconds, jsonPrinter.print(statsReport)); + } catch (InvalidProtocolBufferException e) { + LOG.error("Error parsing Report from protobuffer for Topic {}", mqttMsg.getTopic(), e); + } + } + if (LOG.isTraceEnabled()) + LOG.trace("Completed publishing service metrics from received mqtt stats"); + } + }; + sender.start(); + } catch (Exception e) { String msgStr = new String(mqttMsg.getPayload(), utf8); LOG.warn("Could not process message topic = {}\nmessage = {}", mqttMsg.getTopic(), msgStr);