mirror of
				https://github.com/Telecominfraproject/wlan-cloud-opensync-controller.git
				synced 2025-10-30 18:17:59 +00:00 
			
		
		
		
	MQTT Client analysis and optimization
Signed-off-by: Mike Hansen <mike.hansen@connectus.ai>
This commit is contained in:
		| @@ -1067,16 +1067,6 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
|         mqttMessageProcessor.processMqttMessage(topic, report); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void processMqttMessage(String topic, FlowReport flowReport) { | ||||
|         mqttMessageProcessor.processMqttMessage(topic, flowReport); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void processMqttMessage(String topic, WCStatsReport wcStatsReport) { | ||||
|         mqttMessageProcessor.processMqttMessage(topic, wcStatsReport); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void wifiVIFStateDbTableUpdate(List<OpensyncAPVIFState> vifStateTables, String apId) { | ||||
|         LOG.debug("Received Wifi_VIF_State table update for AP {}", apId); | ||||
|   | ||||
| @@ -154,43 +154,6 @@ public class MqttStatsPublisher { | ||||
|     @Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}") | ||||
|     private int memoryUtilThresholdPct; | ||||
|  | ||||
|     public void processMqttMessage(String topic, WCStatsReport wcStatsReport) { | ||||
|         LOG.info("Received WCStatsReport {}", wcStatsReport.toString()); | ||||
|  | ||||
|         LOG.debug("Received report on topic {}", topic); | ||||
|         int customerId = extractCustomerIdFromTopic(topic); | ||||
|  | ||||
|         long equipmentId = extractEquipmentIdFromTopic(topic); | ||||
|         if ((equipmentId <= 0) || (customerId <= 0)) { | ||||
|             LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         String apId = extractApIdFromTopic(topic); | ||||
|  | ||||
|         if (apId == null) { | ||||
|             LOG.warn("Cannot determine AP id from topic {} - customerId {} equipmentId {} apId {}", topic, customerId, equipmentId, apId); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         if (LOG.isTraceEnabled()) { | ||||
|             // prepare a JSONPrinter to format protobuf messages as | ||||
|             // json | ||||
|             List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); | ||||
|             protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes()); | ||||
|             TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); | ||||
|             JsonFormat.Printer jsonPrinter = JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); | ||||
|  | ||||
|             try { | ||||
|                 LOG.trace("MQTT IpDnsTelemetry.wcStatsReport = {}", jsonPrinter.print(wcStatsReport)); | ||||
|  | ||||
|             } catch (InvalidProtocolBufferException e1) { | ||||
|                 LOG.error("Couldn't parse IpDnsTelemetry.wcStatsReport.", e1); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|     } | ||||
|  | ||||
|     public void processMqttMessage(String topic, Report report) { | ||||
|         LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID()); | ||||
|         int customerId = extractCustomerIdFromTopic(topic); | ||||
| @@ -200,9 +163,6 @@ public class MqttStatsPublisher { | ||||
|             LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         // gatewayController.updateActiveCustomer(customerId); | ||||
|  | ||||
|         Equipment ce = equipmentServiceInterface.getOrNull(equipmentId); | ||||
|         if (ce == null) { | ||||
|             LOG.warn("Cannot read equipment {}", apId); | ||||
| @@ -212,21 +172,6 @@ public class MqttStatsPublisher { | ||||
|         long locationId = ce.getLocationId(); | ||||
|         long profileId = ce.getProfileId(); | ||||
|  | ||||
|         // prepare a JSONPrinter to format protobuf messages as | ||||
|         // json | ||||
|         // List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); | ||||
|         // protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes()); | ||||
|         // TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); | ||||
|         // JsonFormat.Printer jsonPrinter = | ||||
|         // JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); | ||||
|         // | ||||
|         // try { | ||||
|         // LOG.trace(jsonPrinter.print(report)); | ||||
|         // | ||||
|         // } catch (InvalidProtocolBufferException e1) { | ||||
|         // LOG.error("Couldn't parse OpensyncStats.report.", e1); | ||||
|         // } | ||||
|  | ||||
|         List<ServiceMetric> metricRecordList = new ArrayList<>(); | ||||
|  | ||||
|         try { | ||||
| @@ -254,53 +199,13 @@ public class MqttStatsPublisher { | ||||
|                 }); | ||||
|                 cloudEventDispatcherInterface.publishMetrics(metricRecordList); | ||||
|             } | ||||
|  | ||||
|             publishEvents(report, customerId, equipmentId, apId, locationId); | ||||
|             // handleRssiMetrics(metricRecordList, report, customerId, | ||||
|             // equipmentId, locationId); | ||||
|  | ||||
|         } catch (Exception e) { | ||||
|             LOG.error("Exception when processing stats messages from AP", e); | ||||
|         } | ||||
|  | ||||
|     } | ||||
|  | ||||
|     public void processMqttMessage(String topic, FlowReport flowReport) { | ||||
|  | ||||
|         LOG.info("Received report on topic {}", topic); | ||||
|         int customerId = extractCustomerIdFromTopic(topic); | ||||
|  | ||||
|         long equipmentId = extractEquipmentIdFromTopic(topic); | ||||
|         if ((equipmentId <= 0) || (customerId <= 0)) { | ||||
|             LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         String apId = extractApIdFromTopic(topic); | ||||
|  | ||||
|         if (apId == null) { | ||||
|             LOG.warn("Cannot determine AP id from topic {} - customerId {} equipmentId {} apId {}", topic, customerId, equipmentId, apId); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         if (LOG.isTraceEnabled()) { | ||||
|             // prepare a JSONPrinter to format protobuf messages as | ||||
|             // json | ||||
|             List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); | ||||
|             protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes()); | ||||
|             TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); | ||||
|             JsonFormat.Printer jsonPrinter = JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); | ||||
|  | ||||
|             try { | ||||
|                 LOG.trace("MQTT NetworkMetadata.flowReport = {}", jsonPrinter.print(flowReport)); | ||||
|  | ||||
|             } catch (InvalidProtocolBufferException e1) { | ||||
|                 LOG.error("Couldn't parse NetworkMetadata.flowReport.", e1); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|     } | ||||
|  | ||||
|     public void publishSystemEventFromTableStateMonitor(SystemEvent event) { | ||||
|         LOG.info("Publishing SystemEvent received by TableStateMonitor {}", event); | ||||
|         cloudEventDispatcherInterface.publishEvent(event); | ||||
|   | ||||
| @@ -38,10 +38,6 @@ public interface OpensyncExternalIntegrationInterface { | ||||
|  | ||||
|     void processMqttMessage(String topic, Report report); | ||||
|  | ||||
|     void processMqttMessage(String topic, FlowReport flowReport); | ||||
|  | ||||
|     void processMqttMessage(String topic, WCStatsReport wcStatsReport); | ||||
|  | ||||
|     void wifiAssociatedClientsDbTableUpdate(List<OpensyncWifiAssociatedClients> wifiAssociatedClients, String apId); | ||||
|  | ||||
|     void wifiAssociatedClientsDbTableDelete(String deletedClientMac, String apId); | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
|  | ||||
| package com.telecominfraproject.wlan.opensync.mqtt; | ||||
|  | ||||
| import java.nio.charset.Charset; | ||||
| @@ -5,11 +6,15 @@ import java.util.ArrayList; | ||||
| import java.util.Arrays; | ||||
| import java.util.List; | ||||
|  | ||||
| import org.fusesource.mqtt.client.FutureConnection; | ||||
| import org.fusesource.mqtt.client.BlockingConnection; | ||||
| import org.fusesource.mqtt.client.MQTT; | ||||
| import org.fusesource.mqtt.client.Message; | ||||
| import org.fusesource.mqtt.client.QoS; | ||||
| import org.fusesource.mqtt.client.Topic; | ||||
| import org.fusesource.mqtt.client.Tracer; | ||||
| import org.fusesource.mqtt.codec.MQTTFrame; | ||||
| import org.fusesource.mqtt.codec.PINGREQ; | ||||
| import org.fusesource.mqtt.codec.PINGRESP; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.springframework.beans.factory.annotation.Autowired; | ||||
| @@ -37,10 +42,6 @@ import com.telecominfraproject.wlan.opensync.util.ZlibUtil; | ||||
|  | ||||
| import sts.OpensyncStats; | ||||
| import sts.OpensyncStats.Report; | ||||
| import traffic.NetworkMetadata; | ||||
| import traffic.NetworkMetadata.FlowReport; | ||||
| import wc.stats.IpDnsTelemetry; | ||||
| import wc.stats.IpDnsTelemetry.WCStatsReport; | ||||
|  | ||||
| @Profile("mqtt_receiver") | ||||
| @Component | ||||
| @@ -54,15 +55,11 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | ||||
|  | ||||
|     private final TagList tags = CloudMetricsTags.commonTags; | ||||
|  | ||||
|     private final Counter messagesReceived = new BasicCounter( | ||||
|             MonitorConfig.builder("osgw-mqtt-messagesReceived").withTags(tags).build()); | ||||
|     private final Counter messagesReceived = new BasicCounter(MonitorConfig.builder("osgw-mqtt-messagesReceived").withTags(tags).build()); | ||||
|  | ||||
|     private final Counter messageBytesReceived = new BasicCounter( | ||||
|             MonitorConfig.builder("osgw-mqtt-messageBytesReceived").withTags(tags).build()); | ||||
|      | ||||
|     private final Timer timerMessageProcess = new BasicTimer( | ||||
|             MonitorConfig.builder("osgw-mqtt-messageProcessTimer").withTags(tags).build()); | ||||
|     private final Counter messageBytesReceived = new BasicCounter(MonitorConfig.builder("osgw-mqtt-messageBytesReceived").withTags(tags).build()); | ||||
|  | ||||
|     private final Timer timerMessageProcess = new BasicTimer(MonitorConfig.builder("osgw-mqtt-messageProcessTimer").withTags(tags).build()); | ||||
|  | ||||
|     @Autowired | ||||
|     private OpensyncExternalIntegrationInterface extIntegrationInterface; | ||||
| @@ -85,8 +82,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | ||||
|  | ||||
|     public OpensyncMqttClient(@Autowired io.netty.handler.ssl.SslContext sslContext, | ||||
|             @Value("${tip.wlan.mqttBroker.address.internal:testportal.123wlan.com}") String mqttBrokerAddress, | ||||
|             @Value("${tip.wlan.mqttBroker.listenPort:1883}") int mqttBrokerListenPort, | ||||
|             @Value("${tip.wlan.mqttBroker.user:admin}") String username, | ||||
|             @Value("${tip.wlan.mqttBroker.listenPort:1883}") int mqttBrokerListenPort, @Value("${tip.wlan.mqttBroker.user:admin}") String username, | ||||
|             @Value("${tip.wlan.mqttBroker.password:admin}") String password, | ||||
|             @Value("${mqtt.javax.net.ssl.keyStore:/opt/tip-wlan/certs/client_keystore.jks}") String jdkKeyStoreLocation, | ||||
|             @Value("${mqtt.javax.net.ssl.keyStorePassword:mypassword}") String jdkKeyStorePassword, | ||||
| @@ -114,7 +110,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | ||||
|             @Override | ||||
|             public void run() { | ||||
|                 while (keepReconnecting) { | ||||
|                     FutureConnection futureConnection = null; | ||||
|                     BlockingConnection blockingConnection = null; | ||||
|                     try { | ||||
|                         Thread.sleep(5000); | ||||
|  | ||||
| @@ -144,24 +140,46 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | ||||
|                          */ | ||||
|  | ||||
|                         MQTT mqtt = new MQTT(); | ||||
|                         // mqtt.setHost("tcp://192.168.0.137:61616"); | ||||
|                         mqtt.setHost("tls://" + mqttBrokerAddress + ":" + mqttBrokerListenPort); | ||||
|                         LOG.info("Connecting to MQTT broker at {}", mqtt.getHost()); | ||||
|                         mqtt.setClientId("opensync_mqtt"); | ||||
|                         mqtt.setUserName(username); | ||||
|                         mqtt.setPassword(password); | ||||
|                         // Note: the following does not work with the | ||||
|                         // serverContext, | ||||
|                         // it has to be the | ||||
|                         // clientContext | ||||
|                         // mqtt.setSslContext(((JdkSslContext) | ||||
|                         // sslContext).context()); | ||||
|                         // For now we'll rely on regular SSLContext from the JDK | ||||
|                         mqtt.setTracer(new Tracer() { | ||||
|                             @Override | ||||
|                             public void onReceive(MQTTFrame frame) { | ||||
|                                 switch (frame.messageType()) { | ||||
|                                     case PINGREQ.TYPE: | ||||
|                                     case PINGRESP.TYPE: | ||||
|                                         // PINGs don't want to fill log | ||||
|                                         LOG.trace("MQTT Client Received: {}", frame); | ||||
|                                         break; | ||||
|                                     default: | ||||
|                                         LOG.debug("MQTT Client Received: {}", frame); | ||||
|                                 } | ||||
|                             } | ||||
|  | ||||
|                         // TODO: revisit this blocking connection, change it to | ||||
|                         // futureConnection | ||||
|                         futureConnection = mqtt.futureConnection(); | ||||
|                         futureConnection.connect(); | ||||
|                             @Override | ||||
|                             public void onSend(MQTTFrame frame) { | ||||
|                                 switch (frame.messageType()) { | ||||
|                                     case PINGREQ.TYPE: | ||||
|                                     case PINGRESP.TYPE: | ||||
|                                         // PINGs don't want to fill log | ||||
|                                         LOG.trace("MQTT Client Sent: {}", frame); | ||||
|                                         break; | ||||
|                                     default: | ||||
|                                         LOG.debug("MQTT Client Sent: {}", frame); | ||||
|                                 } | ||||
|                             } | ||||
|  | ||||
|                             @Override | ||||
|                             public void debug(String message, Object... args) { | ||||
|                                 LOG.debug("MQTT Client debugging messages: {}", String.format(message, args)); | ||||
|                             } | ||||
|                         }); | ||||
|  | ||||
|                         blockingConnection = mqtt.blockingConnection(); | ||||
|                         blockingConnection.connect(); | ||||
|  | ||||
|                         LOG.info("Connected to MQTT broker at {}", mqtt.getHost()); | ||||
|  | ||||
| @@ -171,24 +189,22 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | ||||
|                         // new Topic("#", QoS.AT_LEAST_ONCE), | ||||
|                         // new Topic("test/#", QoS.EXACTLY_ONCE), | ||||
|                         // new Topic("foo/+/bar", QoS.AT_LEAST_ONCE) | ||||
|                         Topic[] topics = { new Topic("#", QoS.AT_LEAST_ONCE), }; | ||||
|                         Topic[] topics = {new Topic("#", QoS.AT_LEAST_ONCE),}; | ||||
|  | ||||
|                         futureConnection.subscribe(topics); | ||||
|                         blockingConnection.subscribe(topics); | ||||
|                         LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics)); | ||||
|  | ||||
|                         // prepare a JSONPrinter to format protobuf messages as | ||||
|                         // json | ||||
|                         List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); | ||||
|                         protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes()); | ||||
|                         protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes()); | ||||
|                         protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes()); | ||||
|                         TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); | ||||
|                         JsonFormat.Printer jsonPrinter = JsonFormat.printer().includingDefaultValueFields() | ||||
|                                 .omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry); | ||||
|                         JsonFormat.Printer jsonPrinter = | ||||
|                                 JsonFormat.printer().includingDefaultValueFields().omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry); | ||||
|  | ||||
|                         // main loop - receive messages | ||||
|                         while (true) { | ||||
|                             Message mqttMsg = futureConnection.receive().await(); | ||||
|                             Message mqttMsg = blockingConnection.receive(); | ||||
|  | ||||
|                             if (mqttMsg == null) { | ||||
|                                 continue; | ||||
| @@ -197,23 +213,11 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | ||||
|                             LOG.debug("MQTT Topic {}", mqttMsg.getTopic()); | ||||
|  | ||||
|                             byte payload[] = mqttMsg.getPayload(); | ||||
|                             // we acknowledge right after receive because: | ||||
|                             // a. none of the stats messages are so important | ||||
|                             // that | ||||
|                             // we cannot skip one | ||||
|                             // b. if there's some kind of problem with the | ||||
|                             // message | ||||
|                             // (decoding or processing) | ||||
|                             // - we want to move on as quickly as possible and | ||||
|                             // not | ||||
|                             // let it get stuck in the | ||||
|                             // queue | ||||
|                             mqttMsg.ack(); | ||||
|  | ||||
|                             messagesReceived.increment(); | ||||
|                             messageBytesReceived.increment(payload.length); | ||||
|                             Stopwatch stopwatchTimerMessageProcess = timerMessageProcess.start(); | ||||
|                              | ||||
|  | ||||
|                             LOG.trace("received message on topic {} size {}", mqttMsg.getTopic(), payload.length); | ||||
|  | ||||
|                             if (payload[0] == 0x78) { | ||||
| @@ -223,60 +227,27 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | ||||
|                                 payload = ZlibUtil.decompress(payload); | ||||
|                             } | ||||
|  | ||||
|  | ||||
|                             // attempt to parse the message as protobuf | ||||
|                             MessageOrBuilder encodedMsg = null; | ||||
|                             try { | ||||
|  | ||||
|                                 // Only supported protobuf on the TIP opensync APs is Report | ||||
|                                 encodedMsg = Report.parseFrom(payload); | ||||
|  | ||||
|                                 MQTT_LOG.info("topic = {} Report = {}", mqttMsg.getTopic(), | ||||
|                                         jsonPrinter.print(encodedMsg)); | ||||
|  | ||||
|                                 mqttMsg.ack(); | ||||
|                                 MQTT_LOG.info("topic = {}\nReport = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg)); | ||||
|                                 extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg); | ||||
|  | ||||
|                             } catch (Exception e) { | ||||
|                                 try { | ||||
|                                     // not a opensync_stats report, attempt to | ||||
|                                     // deserialize as network_metadata | ||||
|                                     encodedMsg = FlowReport.parseFrom(payload); | ||||
|  | ||||
|                                     MQTT_LOG.info("topic = {} FlowReport = {}", mqttMsg.getTopic(), | ||||
|                                             jsonPrinter.print(encodedMsg)); | ||||
|  | ||||
|                                     extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), | ||||
|                                             (FlowReport) encodedMsg); | ||||
|                                 } catch (Exception e1) { | ||||
|  | ||||
|                                     try { | ||||
|                                         // not a opensync_stats report and not | ||||
|                                         // network_metadata report, attempt to | ||||
|                                         // deserialize as WCStatsReport | ||||
|                                         encodedMsg = WCStatsReport.parseFrom(payload); | ||||
|  | ||||
|                                         MQTT_LOG.info("topic = {} IpDnsTelemetry = {}", mqttMsg.getTopic(), | ||||
|                                                 jsonPrinter.print(encodedMsg)); | ||||
|  | ||||
|  | ||||
|                                         extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), | ||||
|                                                 (WCStatsReport) encodedMsg); | ||||
|                                     } catch (Exception e2) { | ||||
|                                         String msgStr = new String(mqttMsg.getPayload(), utf8); | ||||
|                                         MQTT_LOG.info("topic = {} message = {}", mqttMsg.getTopic(), msgStr); | ||||
|                                     } | ||||
|                                 } | ||||
|                                 String msgStr = new String(mqttMsg.getPayload(), utf8); | ||||
|                                 LOG.warn("Could not process message topic = {}\nmessage = {}", mqttMsg.getTopic(), msgStr); | ||||
|                             } finally { | ||||
|                                 stopwatchTimerMessageProcess.stop(); | ||||
|                             } | ||||
|  | ||||
|                         } | ||||
|  | ||||
|                     } catch (Exception e) { | ||||
|                         LOG.error("Exception in MQTT receiver", e); | ||||
|                     } finally { | ||||
|                         try { | ||||
|                             if (futureConnection != null) { | ||||
|                                 futureConnection.disconnect(); | ||||
|                             if (blockingConnection != null) { | ||||
|                                 blockingConnection.disconnect(); | ||||
|                             } | ||||
|                         } catch (Exception e1) { | ||||
|                             // do nothing | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Mike Hansen
					Mike Hansen