From 210976887f88c58bc4a6de9fed51c1e01026205b Mon Sep 17 00:00:00 2001 From: Mike Hansen Date: Wed, 23 Jun 2021 17:43:01 -0400 Subject: [PATCH] MQTT Client analysis and optimization Signed-off-by: Mike Hansen --- .../OpensyncExternalIntegrationCloud.java | 10 -- .../integration/utils/MqttStatsPublisher.java | 95 ------------ .../OpensyncExternalIntegrationInterface.java | 4 - .../opensync/mqtt/OpensyncMqttClient.java | 145 +++++++----------- 4 files changed, 58 insertions(+), 196 deletions(-) diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/OpensyncExternalIntegrationCloud.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/OpensyncExternalIntegrationCloud.java index c869b14..d8b140a 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/OpensyncExternalIntegrationCloud.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/OpensyncExternalIntegrationCloud.java @@ -1067,16 +1067,6 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra mqttMessageProcessor.processMqttMessage(topic, report); } - @Override - public void processMqttMessage(String topic, FlowReport flowReport) { - mqttMessageProcessor.processMqttMessage(topic, flowReport); - } - - @Override - public void processMqttMessage(String topic, WCStatsReport wcStatsReport) { - mqttMessageProcessor.processMqttMessage(topic, wcStatsReport); - } - @Override public void wifiVIFStateDbTableUpdate(List vifStateTables, String apId) { LOG.debug("Received Wifi_VIF_State table update for AP {}", apId); diff --git a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java index 5f91c65..4f4bfd3 100644 --- a/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java +++ b/opensync-ext-cloud/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/utils/MqttStatsPublisher.java @@ -154,43 +154,6 @@ public class MqttStatsPublisher { @Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}") private int memoryUtilThresholdPct; - public void processMqttMessage(String topic, WCStatsReport wcStatsReport) { - LOG.info("Received WCStatsReport {}", wcStatsReport.toString()); - - LOG.debug("Received report on topic {}", topic); - int customerId = extractCustomerIdFromTopic(topic); - - long equipmentId = extractEquipmentIdFromTopic(topic); - if ((equipmentId <= 0) || (customerId <= 0)) { - LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); - return; - } - - String apId = extractApIdFromTopic(topic); - - if (apId == null) { - LOG.warn("Cannot determine AP id from topic {} - customerId {} equipmentId {} apId {}", topic, customerId, equipmentId, apId); - return; - } - - if (LOG.isTraceEnabled()) { - // prepare a JSONPrinter to format protobuf messages as - // json - List protobufDescriptors = new ArrayList<>(); - protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes()); - TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); - JsonFormat.Printer jsonPrinter = JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); - - try { - LOG.trace("MQTT IpDnsTelemetry.wcStatsReport = {}", jsonPrinter.print(wcStatsReport)); - - } catch (InvalidProtocolBufferException e1) { - LOG.error("Couldn't parse IpDnsTelemetry.wcStatsReport.", e1); - } - } - - } - public void processMqttMessage(String topic, Report report) { LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID()); int customerId = extractCustomerIdFromTopic(topic); @@ -200,9 +163,6 @@ public class MqttStatsPublisher { LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); return; } - - // gatewayController.updateActiveCustomer(customerId); - Equipment ce = equipmentServiceInterface.getOrNull(equipmentId); if (ce == null) { LOG.warn("Cannot read equipment {}", apId); @@ -212,21 +172,6 @@ public class MqttStatsPublisher { long locationId = ce.getLocationId(); long profileId = ce.getProfileId(); - // prepare a JSONPrinter to format protobuf messages as - // json - // List protobufDescriptors = new ArrayList<>(); - // protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes()); - // TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); - // JsonFormat.Printer jsonPrinter = - // JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); - // - // try { - // LOG.trace(jsonPrinter.print(report)); - // - // } catch (InvalidProtocolBufferException e1) { - // LOG.error("Couldn't parse OpensyncStats.report.", e1); - // } - List metricRecordList = new ArrayList<>(); try { @@ -254,53 +199,13 @@ public class MqttStatsPublisher { }); cloudEventDispatcherInterface.publishMetrics(metricRecordList); } - publishEvents(report, customerId, equipmentId, apId, locationId); - // handleRssiMetrics(metricRecordList, report, customerId, - // equipmentId, locationId); - } catch (Exception e) { LOG.error("Exception when processing stats messages from AP", e); } } - public void processMqttMessage(String topic, FlowReport flowReport) { - - LOG.info("Received report on topic {}", topic); - int customerId = extractCustomerIdFromTopic(topic); - - long equipmentId = extractEquipmentIdFromTopic(topic); - if ((equipmentId <= 0) || (customerId <= 0)) { - LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); - return; - } - - String apId = extractApIdFromTopic(topic); - - if (apId == null) { - LOG.warn("Cannot determine AP id from topic {} - customerId {} equipmentId {} apId {}", topic, customerId, equipmentId, apId); - return; - } - - if (LOG.isTraceEnabled()) { - // prepare a JSONPrinter to format protobuf messages as - // json - List protobufDescriptors = new ArrayList<>(); - protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes()); - TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); - JsonFormat.Printer jsonPrinter = JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); - - try { - LOG.trace("MQTT NetworkMetadata.flowReport = {}", jsonPrinter.print(flowReport)); - - } catch (InvalidProtocolBufferException e1) { - LOG.error("Couldn't parse NetworkMetadata.flowReport.", e1); - } - } - - } - public void publishSystemEventFromTableStateMonitor(SystemEvent event) { LOG.info("Publishing SystemEvent received by TableStateMonitor {}", event); cloudEventDispatcherInterface.publishEvent(event); diff --git a/opensync-ext-interface/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/OpensyncExternalIntegrationInterface.java b/opensync-ext-interface/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/OpensyncExternalIntegrationInterface.java index 914f1ed..f2525d9 100644 --- a/opensync-ext-interface/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/OpensyncExternalIntegrationInterface.java +++ b/opensync-ext-interface/src/main/java/com/telecominfraproject/wlan/opensync/external/integration/OpensyncExternalIntegrationInterface.java @@ -38,10 +38,6 @@ public interface OpensyncExternalIntegrationInterface { void processMqttMessage(String topic, Report report); - void processMqttMessage(String topic, FlowReport flowReport); - - void processMqttMessage(String topic, WCStatsReport wcStatsReport); - void wifiAssociatedClientsDbTableUpdate(List wifiAssociatedClients, String apId); void wifiAssociatedClientsDbTableDelete(String deletedClientMac, String apId); diff --git a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java index 150666f..de3ad90 100644 --- a/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java +++ b/opensync-gateway/src/main/java/com/telecominfraproject/wlan/opensync/mqtt/OpensyncMqttClient.java @@ -1,3 +1,4 @@ + package com.telecominfraproject.wlan.opensync.mqtt; import java.nio.charset.Charset; @@ -5,11 +6,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.fusesource.mqtt.client.FutureConnection; +import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.client.Tracer; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.codec.PINGREQ; +import org.fusesource.mqtt.codec.PINGRESP; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -37,10 +42,6 @@ import com.telecominfraproject.wlan.opensync.util.ZlibUtil; import sts.OpensyncStats; import sts.OpensyncStats.Report; -import traffic.NetworkMetadata; -import traffic.NetworkMetadata.FlowReport; -import wc.stats.IpDnsTelemetry; -import wc.stats.IpDnsTelemetry.WCStatsReport; @Profile("mqtt_receiver") @Component @@ -54,15 +55,11 @@ public class OpensyncMqttClient implements ApplicationListener protobufDescriptors = new ArrayList<>(); protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes()); - protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes()); - protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes()); TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); - JsonFormat.Printer jsonPrinter = JsonFormat.printer().includingDefaultValueFields() - .omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry); + JsonFormat.Printer jsonPrinter = + JsonFormat.printer().includingDefaultValueFields().omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry); // main loop - receive messages while (true) { - Message mqttMsg = futureConnection.receive().await(); + Message mqttMsg = blockingConnection.receive(); if (mqttMsg == null) { continue; @@ -197,23 +213,11 @@ public class OpensyncMqttClient implements ApplicationListener