[WIFI-3165] Keep last stats received timestamp in gateway client session map

Signed-off-by: Mike Hansen <mike.hansen@connectus.ai>
This commit is contained in:
Mike Hansen
2021-07-19 11:01:13 -04:00
parent 1070708b44
commit 6264a8c490
6 changed files with 80 additions and 59 deletions

View File

@@ -36,6 +36,8 @@ import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.monitor.Timer;
import com.netflix.servo.tag.TagList;
import com.telecominfraproject.wlan.cloudmetrics.CloudMetricsTags;
import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSession;
import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSessionMapInterface;
import com.telecominfraproject.wlan.opensync.external.integration.utils.StatsPublisherInterface;
import com.telecominfraproject.wlan.opensync.util.ZlibUtil;
@@ -46,19 +48,10 @@ import sts.OpensyncStats.Report;
@Component
public class OpensyncMqttClient implements ApplicationListener<ContextClosedEvent> {
// private static final String METRICS_WKR_PFX = "metrics-wkr-";
// public static final int REPORT_PROCESSING_THRESHOLD_SEC = 30;
private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class);
private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA");
private static final Logger MQTT_TRACER_LOG = LoggerFactory.getLogger("MQTT_CLIENT_TRACER");
// private static final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new
// CustomizableThreadFactory(METRICS_WKR_PFX));
public static Charset utf8 = Charset.forName("UTF-8");
private final TagList tags = CloudMetricsTags.commonTags;
@@ -71,6 +64,9 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
@Autowired
private StatsPublisherInterface statsPublisher;
@Autowired
private OvsdbSessionMapInterface ovsdbSessionMapInterface;
// dtop: use anonymous constructor to ensure that the following code always
// get executed,
@@ -153,37 +149,6 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
mqtt.setClientId("opensync_mqtt");
mqtt.setUserName(username);
mqtt.setPassword(password);
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
switch (frame.messageType()) {
case PINGRESP.TYPE:
// PINGs don't want to fill log
MQTT_TRACER_LOG.trace("OpensyncMqttClient Rx: {}", frame);
break;
default:
MQTT_TRACER_LOG.info("OpensyncMqttClient Rx: {}", frame);
}
}
@Override
public void onSend(MQTTFrame frame) {
switch (frame.messageType()) {
case PINGREQ.TYPE:
// PINGs don't want to fill log
MQTT_TRACER_LOG.trace("OpensyncMqttClient Tx: {}", frame);
break;
default:
MQTT_TRACER_LOG.info("OpensyncMqttClient Tx: {}", frame);
}
}
@Override
public void debug(String message, Object... args) {
MQTT_TRACER_LOG.info(String.format(message, args));
}
});
blockingConnection = mqtt.blockingConnection();
blockingConnection.connect();
@@ -228,14 +193,22 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
// Only supported protobuf on the TIP opensync APs is Report
Report statsReport = Report.parseFrom(payload);
mqttMsg.ack();
String apId = extractApIdFromTopic(mqttMsg.getTopic());
if (apId != null) {
OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(extractApIdFromTopic(mqttMsg.getTopic()));
if (ovsdbSession != null) {
ovsdbSession.setMostRecentStatsTimestamp(System.currentTimeMillis());
LOG.debug("Last metrics received from AP updated to {}",ovsdbSession.toString());
} else {
LOG.debug("No ovsdb session exists for this AP {}",apId);
}
}
MQTT_LOG.info("Topic {}\n{}", mqttMsg.getTopic(), jsonPrinter.print(statsReport));
statsPublisher.processMqttMessage(mqttMsg.getTopic(), statsReport);
LOG.debug("Dispatched report for topic {} to backend for processing", mqttMsg.getTopic());
} catch (Exception e) {
String msgStr = new String(mqttMsg.getPayload(), utf8);
LOG.warn("Could not process message topic = {}\nmessage = {}", mqttMsg.getTopic(), msgStr);
LOG.error("Exception processing topic for message {}",mqttMsg, e);
} finally {
stopwatchTimerMessageProcess.stop();
}
@@ -271,4 +244,26 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
mqttClientThread.interrupt();
}
}
/**
* @param topic
* @return apId extracted from the topic name, or null if it cannot be
* extracted
*/
static String extractApIdFromTopic(String topic) {
// Topic is formatted as
// "/ap/"+clientCn+"_"+ret.serialNumber+"/opensync"
if (topic == null) {
return null;
}
String[] parts = topic.split("/");
if (parts.length < 3) {
return null;
}
// apId is the third element in the topic
return parts[2];
}
}