MQTT changes:

Remove MQTT message hop from OpensyncExternalIntegrationInterface
  Add separate logging for mqtt msg tracing

Signed-off-by: Mike Hansen <mike.hansen@connectus.ai>
This commit is contained in:
Mike Hansen
2021-07-08 09:36:15 -04:00
parent 785dc0e6c4
commit b842c532a8
10 changed files with 87 additions and 34 deletions

View File

@@ -11,11 +11,11 @@
<name>opensync-ext-cloud</name>
<description>Configuration interface that provides config from the cloud services.</description>
<dependencies>
<!-- <dependency> -->
<!-- <groupId>com.telecominfraproject.wlan</groupId> -->
<!-- <artifactId>opensync-ext-interface</artifactId> -->
<!-- <version>0.0.1-SNAPSHOT</version> -->
<!-- </dependency> -->
<dependency>
<groupId>com.telecominfraproject.wlan</groupId>
<artifactId>opensync-ext-interface</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.telecominfraproject.wlan</groupId>
<artifactId>opensync-gateway</artifactId>

View File

@@ -76,7 +76,7 @@ import com.telecominfraproject.wlan.opensync.external.integration.models.Opensyn
import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncAPVIFState;
import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncAWLANNode;
import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncWifiAssociatedClients;
import com.telecominfraproject.wlan.opensync.external.integration.utils.MqttStatsPublisher;
import com.telecominfraproject.wlan.opensync.external.integration.utils.StatsPublisherInterface;
import com.telecominfraproject.wlan.opensync.ovsdb.dao.models.enumerations.DhcpFpDeviceType;
import com.telecominfraproject.wlan.opensync.util.OvsdbStringConstants;
import com.telecominfraproject.wlan.opensync.util.OvsdbToWlanCloudTypeMappingUtility;
@@ -147,7 +147,7 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra
@Autowired
private FirmwareServiceInterface firmwareServiceInterface;
@Autowired
private MqttStatsPublisher mqttMessageProcessor;
private StatsPublisherInterface mqttMessageProcessor;
@Autowired
private AlarmServiceInterface alarmServiceInterface;
@@ -1062,11 +1062,6 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra
return ret;
}
@Override
public void processMqttMessage(String topic, Report report) {
mqttMessageProcessor.processMqttMessage(topic, report);
}
@Override
public void wifiVIFStateDbTableUpdate(List<OpensyncAPVIFState> vifStateTables, String apId) {
LOG.debug("Received Wifi_VIF_State table update for AP {}", apId);

View File

@@ -124,7 +124,7 @@ import wc.stats.IpDnsTelemetry.WCStatsReport;
@org.springframework.context.annotation.Profile("opensync_cloud_config")
@Component
public class MqttStatsPublisher {
public class MqttStatsPublisher implements StatsPublisherInterface {
private static final Logger LOG = LoggerFactory.getLogger(MqttStatsPublisher.class);
@@ -154,6 +154,7 @@ public class MqttStatsPublisher {
@Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}")
private int memoryUtilThresholdPct;
@Override
public void processMqttMessage(String topic, Report report) {
LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID());
String apId = extractApIdFromTopic(topic);
@@ -202,6 +203,7 @@ public class MqttStatsPublisher {
}
@Override
public void publishSystemEventFromTableStateMonitor(SystemEvent event) {
LOG.info("Publishing SystemEvent received by TableStateMonitor {}", event);
cloudEventDispatcherInterface.publishEvent(event);

View File

@@ -515,11 +515,6 @@ public class OpensyncExternalIntegrationCloudTest {
Mockito.when(clientServiceInterface.updateSession(ArgumentMatchers.any(ClientSession.class)))
.thenReturn(clientSession).thenReturn(clientSession2);
opensyncExternalIntegrationCloud.processMqttMessage(topic, report);
Mockito.verify(opensyncExternalIntegrationMqttProcessor, Mockito.times(1)).processMqttMessage(topic, report);
}
@Ignore

View File

@@ -116,7 +116,7 @@ public class MqttStatsPublisherTest {
static class Config {
@Bean
public MqttStatsPublisher mqttStatsPublisher() {
public StatsPublisherInterface mqttStatsPublisher() {
return new MqttStatsPublisher();
}

View File

@@ -36,8 +36,6 @@ public interface OpensyncExternalIntegrationInterface {
void wifiInetStateDbTableDelete(List<OpensyncAPInetState> inetStateTables, String apId);
void processMqttMessage(String topic, Report report);
void wifiAssociatedClientsDbTableUpdate(List<OpensyncWifiAssociatedClients> wifiAssociatedClients, String apId);
void wifiAssociatedClientsDbTableDelete(String deletedClientMac, String apId);

View File

@@ -0,0 +1,14 @@
package com.telecominfraproject.wlan.opensync.external.integration.utils;
import com.telecominfraproject.wlan.systemevent.models.SystemEvent;
import sts.OpensyncStats.Report;
public interface StatsPublisherInterface {
void processMqttMessage(String topic, Report report);
void publishSystemEventFromTableStateMonitor(SystemEvent event);
}

View File

@@ -30,6 +30,22 @@
</triggeringPolicy>
</appender>
<appender name="mqttMsgTracer" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/app/logs/mqttMsgTracer.log</file>
<append>true</append>
<encoder>
<pattern>%date %msg%n</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>/app/logs/mqttMsgTracer.%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>3</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>20MB</maxFileSize>
</triggeringPolicy>
</appender>
<appender name="logfile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/app/logs/opensyncgw.log</file>
<append>true</append>
@@ -67,10 +83,14 @@
<logger name="com.vmware.ovsdb.service.OvsdbConnectionInfo" level="OFF"/>
<logger name="com.vmware.ovsdb.netty.OvsdbConnectionHandler" level="ERROR"/>
<logger name="MQTT_DATA" level="DEBUG" additivity="false">
<logger name="MQTT_DATA" level="INFO" additivity="false">
<appender-ref ref="mqttDataFile"/>
</logger>
<logger name="MQTT_TRACER_LOG" level="INFO" additivity="false">
<appender-ref ref="mqttMsgTracer"/>
</logger>
<root level="WARN">
<appender-ref ref="logfile"/>
</root>

View File

@@ -63,7 +63,8 @@
<logger name="com.vmware.ovsdb.service.OvsdbConnectionInfo" level="OFF"/>
<logger name="com.vmware.ovsdb.netty.OvsdbConnectionHandler" level="ERROR"/>
<logger name="MQTT_DATA" level="DEBUG"/>
<logger name="MQTT_DATA" level="INFO"/>
<logger name="MQTT_CLIENT_TRACER" level="DEBUG"/>
<!--
<logger name="org.springframework.security.web.authentication.preauth" level="DEBUG"/>

View File

@@ -13,9 +13,19 @@ import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,6 +49,7 @@ import com.netflix.servo.monitor.Timer;
import com.netflix.servo.tag.TagList;
import com.telecominfraproject.wlan.cloudmetrics.CloudMetricsTags;
import com.telecominfraproject.wlan.opensync.external.integration.OpensyncExternalIntegrationInterface;
import com.telecominfraproject.wlan.opensync.external.integration.utils.StatsPublisherInterface;
import com.telecominfraproject.wlan.opensync.util.ZlibUtil;
import sts.OpensyncStats;
@@ -54,6 +65,8 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA");
private static final Logger MQTT_TRACER_LOG = LoggerFactory.getLogger("MQTT_CLIENT_TRACER");
public static Charset utf8 = Charset.forName("UTF-8");
private final TagList tags = CloudMetricsTags.commonTags;
@@ -65,7 +78,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
private final Timer timerMessageProcess = new BasicTimer(MonitorConfig.builder("osgw-mqtt-messageProcessTimer").withTags(tags).build());
@Autowired
private OpensyncExternalIntegrationInterface extIntegrationInterface;
private StatsPublisherInterface statsPublisher;
// dtop: use anonymous constructor to ensure that the following code always
// get executed,
@@ -152,13 +165,21 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
@Override
public void onReceive(MQTTFrame frame) {
switch (frame.messageType()) {
case PINGREQ.TYPE:
case PINGRESP.TYPE:
// PINGs don't want to fill log
LOG.trace("OpensyncMqttClient Rx: {}", frame);
MQTT_TRACER_LOG.trace("OpensyncMqttClient Rx: {}", frame);
break;
case SUBACK.TYPE:
case CONNACK.TYPE:
case PUBACK.TYPE:
case PUBCOMP.TYPE:
case PUBREC.TYPE:
case PUBREL.TYPE:
case PUBLISH.TYPE:
MQTT_TRACER_LOG.info("OpensyncMqttClient Rx: {}", frame);
break;
default:
LOG.debug("OpensyncMqttClient Rx: {}", frame);
MQTT_TRACER_LOG.debug("OpensyncMqttClient Rx: {}", frame);
}
}
@@ -166,12 +187,18 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
public void onSend(MQTTFrame frame) {
switch (frame.messageType()) {
case PINGREQ.TYPE:
case PINGRESP.TYPE:
// PINGs don't want to fill log
LOG.trace("OpensyncMqttClient Tx: {}", frame);
MQTT_TRACER_LOG.trace("OpensyncMqttClient Tx: {}", frame);
break;
case SUBSCRIBE.TYPE:
case CONNECT.TYPE:
case DISCONNECT.TYPE:
case PUBACK.TYPE:
case PUBCOMP.TYPE:
MQTT_TRACER_LOG.info("OpensyncMqttClient Tx: {}", frame);
break;
default:
LOG.debug("OpensyncMqttClient Tx: {}", frame);
MQTT_TRACER_LOG.debug("OpensyncMqttClient Tx: {}", frame);
}
}
@@ -223,17 +250,18 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
}
// Only supported protobuf on the TIP opensync APs is Report
Report statsReport = Report.parseFrom(payload);
mqttMsg.ack();
if (LOG.isTraceEnabled())
LOG.trace("Received opensync stats report for topic = {}\nReport = {}", mqttMsg.getTopic(), statsReport.toString());
Thread sender = new Thread(statsReport.getNodeID() + "-worker") {
MQTT_LOG.info("Topic {}\n{}",mqttMsg.getTopic(), jsonPrinter.print(statsReport));
Thread sender = new Thread(mqttMsg.getTopic().split("/")[2] + "-worker") {
@Override
public void run() {
long startTime = System.nanoTime();
if (LOG.isTraceEnabled())
LOG.trace("Start publishing service metrics from received mqtt stats");
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), statsReport);
mqttMsg.ack();
statsPublisher.processMqttMessage(mqttMsg.getTopic(), statsReport);
long elapsedTimeSeconds = TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (elapsedTimeSeconds > REPORT_PROCESSING_THRESHOLD_SEC) {
try {