mirror of
				https://github.com/Telecominfraproject/wlan-cloud-opensync-controller.git
				synced 2025-10-31 02:27:49 +00:00 
			
		
		
		
	Merge pull request #46 from Telecominfraproject/mqtt_client_analysis_and_optimization
MQTT Client analysis and optimization
This commit is contained in:
		| @@ -1067,16 +1067,6 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | |||||||
|         mqttMessageProcessor.processMqttMessage(topic, report); |         mqttMessageProcessor.processMqttMessage(topic, report); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     @Override |  | ||||||
|     public void processMqttMessage(String topic, FlowReport flowReport) { |  | ||||||
|         mqttMessageProcessor.processMqttMessage(topic, flowReport); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     @Override |  | ||||||
|     public void processMqttMessage(String topic, WCStatsReport wcStatsReport) { |  | ||||||
|         mqttMessageProcessor.processMqttMessage(topic, wcStatsReport); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     @Override |     @Override | ||||||
|     public void wifiVIFStateDbTableUpdate(List<OpensyncAPVIFState> vifStateTables, String apId) { |     public void wifiVIFStateDbTableUpdate(List<OpensyncAPVIFState> vifStateTables, String apId) { | ||||||
|         LOG.debug("Received Wifi_VIF_State table update for AP {}", apId); |         LOG.debug("Received Wifi_VIF_State table update for AP {}", apId); | ||||||
|   | |||||||
| @@ -154,43 +154,6 @@ public class MqttStatsPublisher { | |||||||
|     @Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}") |     @Value("${tip.wlan.mqttStatsPublisher.memoryUtilThresholdPct:70}") | ||||||
|     private int memoryUtilThresholdPct; |     private int memoryUtilThresholdPct; | ||||||
|  |  | ||||||
|     public void processMqttMessage(String topic, WCStatsReport wcStatsReport) { |  | ||||||
|         LOG.info("Received WCStatsReport {}", wcStatsReport.toString()); |  | ||||||
|  |  | ||||||
|         LOG.debug("Received report on topic {}", topic); |  | ||||||
|         int customerId = extractCustomerIdFromTopic(topic); |  | ||||||
|  |  | ||||||
|         long equipmentId = extractEquipmentIdFromTopic(topic); |  | ||||||
|         if ((equipmentId <= 0) || (customerId <= 0)) { |  | ||||||
|             LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); |  | ||||||
|             return; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         String apId = extractApIdFromTopic(topic); |  | ||||||
|  |  | ||||||
|         if (apId == null) { |  | ||||||
|             LOG.warn("Cannot determine AP id from topic {} - customerId {} equipmentId {} apId {}", topic, customerId, equipmentId, apId); |  | ||||||
|             return; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         if (LOG.isTraceEnabled()) { |  | ||||||
|             // prepare a JSONPrinter to format protobuf messages as |  | ||||||
|             // json |  | ||||||
|             List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); |  | ||||||
|             protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes()); |  | ||||||
|             TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); |  | ||||||
|             JsonFormat.Printer jsonPrinter = JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); |  | ||||||
|  |  | ||||||
|             try { |  | ||||||
|                 LOG.trace("MQTT IpDnsTelemetry.wcStatsReport = {}", jsonPrinter.print(wcStatsReport)); |  | ||||||
|  |  | ||||||
|             } catch (InvalidProtocolBufferException e1) { |  | ||||||
|                 LOG.error("Couldn't parse IpDnsTelemetry.wcStatsReport.", e1); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     public void processMqttMessage(String topic, Report report) { |     public void processMqttMessage(String topic, Report report) { | ||||||
|         LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID()); |         LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID()); | ||||||
|         int customerId = extractCustomerIdFromTopic(topic); |         int customerId = extractCustomerIdFromTopic(topic); | ||||||
| @@ -200,9 +163,6 @@ public class MqttStatsPublisher { | |||||||
|             LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); |             LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); | ||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         // gatewayController.updateActiveCustomer(customerId); |  | ||||||
|  |  | ||||||
|         Equipment ce = equipmentServiceInterface.getOrNull(equipmentId); |         Equipment ce = equipmentServiceInterface.getOrNull(equipmentId); | ||||||
|         if (ce == null) { |         if (ce == null) { | ||||||
|             LOG.warn("Cannot read equipment {}", apId); |             LOG.warn("Cannot read equipment {}", apId); | ||||||
| @@ -212,21 +172,6 @@ public class MqttStatsPublisher { | |||||||
|         long locationId = ce.getLocationId(); |         long locationId = ce.getLocationId(); | ||||||
|         long profileId = ce.getProfileId(); |         long profileId = ce.getProfileId(); | ||||||
|  |  | ||||||
|         // prepare a JSONPrinter to format protobuf messages as |  | ||||||
|         // json |  | ||||||
|         // List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); |  | ||||||
|         // protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes()); |  | ||||||
|         // TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); |  | ||||||
|         // JsonFormat.Printer jsonPrinter = |  | ||||||
|         // JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); |  | ||||||
|         // |  | ||||||
|         // try { |  | ||||||
|         // LOG.trace(jsonPrinter.print(report)); |  | ||||||
|         // |  | ||||||
|         // } catch (InvalidProtocolBufferException e1) { |  | ||||||
|         // LOG.error("Couldn't parse OpensyncStats.report.", e1); |  | ||||||
|         // } |  | ||||||
|  |  | ||||||
|         List<ServiceMetric> metricRecordList = new ArrayList<>(); |         List<ServiceMetric> metricRecordList = new ArrayList<>(); | ||||||
|  |  | ||||||
|         try { |         try { | ||||||
| @@ -254,53 +199,13 @@ public class MqttStatsPublisher { | |||||||
|                 }); |                 }); | ||||||
|                 cloudEventDispatcherInterface.publishMetrics(metricRecordList); |                 cloudEventDispatcherInterface.publishMetrics(metricRecordList); | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             publishEvents(report, customerId, equipmentId, apId, locationId); |             publishEvents(report, customerId, equipmentId, apId, locationId); | ||||||
|             // handleRssiMetrics(metricRecordList, report, customerId, |  | ||||||
|             // equipmentId, locationId); |  | ||||||
|  |  | ||||||
|         } catch (Exception e) { |         } catch (Exception e) { | ||||||
|             LOG.error("Exception when processing stats messages from AP", e); |             LOG.error("Exception when processing stats messages from AP", e); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     public void processMqttMessage(String topic, FlowReport flowReport) { |  | ||||||
|  |  | ||||||
|         LOG.info("Received report on topic {}", topic); |  | ||||||
|         int customerId = extractCustomerIdFromTopic(topic); |  | ||||||
|  |  | ||||||
|         long equipmentId = extractEquipmentIdFromTopic(topic); |  | ||||||
|         if ((equipmentId <= 0) || (customerId <= 0)) { |  | ||||||
|             LOG.warn("Cannot determine equipment ids from topic {} - customerId {} equipmentId {}", topic, customerId, equipmentId); |  | ||||||
|             return; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         String apId = extractApIdFromTopic(topic); |  | ||||||
|  |  | ||||||
|         if (apId == null) { |  | ||||||
|             LOG.warn("Cannot determine AP id from topic {} - customerId {} equipmentId {} apId {}", topic, customerId, equipmentId, apId); |  | ||||||
|             return; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         if (LOG.isTraceEnabled()) { |  | ||||||
|             // prepare a JSONPrinter to format protobuf messages as |  | ||||||
|             // json |  | ||||||
|             List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); |  | ||||||
|             protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes()); |  | ||||||
|             TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); |  | ||||||
|             JsonFormat.Printer jsonPrinter = JsonFormat.printer().preservingProtoFieldNames().includingDefaultValueFields().usingTypeRegistry(oldRegistry); |  | ||||||
|  |  | ||||||
|             try { |  | ||||||
|                 LOG.trace("MQTT NetworkMetadata.flowReport = {}", jsonPrinter.print(flowReport)); |  | ||||||
|  |  | ||||||
|             } catch (InvalidProtocolBufferException e1) { |  | ||||||
|                 LOG.error("Couldn't parse NetworkMetadata.flowReport.", e1); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     public void publishSystemEventFromTableStateMonitor(SystemEvent event) { |     public void publishSystemEventFromTableStateMonitor(SystemEvent event) { | ||||||
|         LOG.info("Publishing SystemEvent received by TableStateMonitor {}", event); |         LOG.info("Publishing SystemEvent received by TableStateMonitor {}", event); | ||||||
|         cloudEventDispatcherInterface.publishEvent(event); |         cloudEventDispatcherInterface.publishEvent(event); | ||||||
|   | |||||||
| @@ -38,10 +38,6 @@ public interface OpensyncExternalIntegrationInterface { | |||||||
|  |  | ||||||
|     void processMqttMessage(String topic, Report report); |     void processMqttMessage(String topic, Report report); | ||||||
|  |  | ||||||
|     void processMqttMessage(String topic, FlowReport flowReport); |  | ||||||
|  |  | ||||||
|     void processMqttMessage(String topic, WCStatsReport wcStatsReport); |  | ||||||
|  |  | ||||||
|     void wifiAssociatedClientsDbTableUpdate(List<OpensyncWifiAssociatedClients> wifiAssociatedClients, String apId); |     void wifiAssociatedClientsDbTableUpdate(List<OpensyncWifiAssociatedClients> wifiAssociatedClients, String apId); | ||||||
|  |  | ||||||
|     void wifiAssociatedClientsDbTableDelete(String deletedClientMac, String apId); |     void wifiAssociatedClientsDbTableDelete(String deletedClientMac, String apId); | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
|  |  | ||||||
| package com.telecominfraproject.wlan.opensync.mqtt; | package com.telecominfraproject.wlan.opensync.mqtt; | ||||||
|  |  | ||||||
| import java.nio.charset.Charset; | import java.nio.charset.Charset; | ||||||
| @@ -5,11 +6,15 @@ import java.util.ArrayList; | |||||||
| import java.util.Arrays; | import java.util.Arrays; | ||||||
| import java.util.List; | import java.util.List; | ||||||
|  |  | ||||||
| import org.fusesource.mqtt.client.FutureConnection; | import org.fusesource.mqtt.client.BlockingConnection; | ||||||
| import org.fusesource.mqtt.client.MQTT; | import org.fusesource.mqtt.client.MQTT; | ||||||
| import org.fusesource.mqtt.client.Message; | import org.fusesource.mqtt.client.Message; | ||||||
| import org.fusesource.mqtt.client.QoS; | import org.fusesource.mqtt.client.QoS; | ||||||
| import org.fusesource.mqtt.client.Topic; | import org.fusesource.mqtt.client.Topic; | ||||||
|  | import org.fusesource.mqtt.client.Tracer; | ||||||
|  | import org.fusesource.mqtt.codec.MQTTFrame; | ||||||
|  | import org.fusesource.mqtt.codec.PINGREQ; | ||||||
|  | import org.fusesource.mqtt.codec.PINGRESP; | ||||||
| import org.slf4j.Logger; | import org.slf4j.Logger; | ||||||
| import org.slf4j.LoggerFactory; | import org.slf4j.LoggerFactory; | ||||||
| import org.springframework.beans.factory.annotation.Autowired; | import org.springframework.beans.factory.annotation.Autowired; | ||||||
| @@ -37,10 +42,6 @@ import com.telecominfraproject.wlan.opensync.util.ZlibUtil; | |||||||
|  |  | ||||||
| import sts.OpensyncStats; | import sts.OpensyncStats; | ||||||
| import sts.OpensyncStats.Report; | import sts.OpensyncStats.Report; | ||||||
| import traffic.NetworkMetadata; |  | ||||||
| import traffic.NetworkMetadata.FlowReport; |  | ||||||
| import wc.stats.IpDnsTelemetry; |  | ||||||
| import wc.stats.IpDnsTelemetry.WCStatsReport; |  | ||||||
|  |  | ||||||
| @Profile("mqtt_receiver") | @Profile("mqtt_receiver") | ||||||
| @Component | @Component | ||||||
| @@ -54,15 +55,11 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | |||||||
|  |  | ||||||
|     private final TagList tags = CloudMetricsTags.commonTags; |     private final TagList tags = CloudMetricsTags.commonTags; | ||||||
|  |  | ||||||
|     private final Counter messagesReceived = new BasicCounter( |     private final Counter messagesReceived = new BasicCounter(MonitorConfig.builder("osgw-mqtt-messagesReceived").withTags(tags).build()); | ||||||
|             MonitorConfig.builder("osgw-mqtt-messagesReceived").withTags(tags).build()); |  | ||||||
|  |  | ||||||
|     private final Counter messageBytesReceived = new BasicCounter( |     private final Counter messageBytesReceived = new BasicCounter(MonitorConfig.builder("osgw-mqtt-messageBytesReceived").withTags(tags).build()); | ||||||
|             MonitorConfig.builder("osgw-mqtt-messageBytesReceived").withTags(tags).build()); |  | ||||||
|      |  | ||||||
|     private final Timer timerMessageProcess = new BasicTimer( |  | ||||||
|             MonitorConfig.builder("osgw-mqtt-messageProcessTimer").withTags(tags).build()); |  | ||||||
|  |  | ||||||
|  |     private final Timer timerMessageProcess = new BasicTimer(MonitorConfig.builder("osgw-mqtt-messageProcessTimer").withTags(tags).build()); | ||||||
|  |  | ||||||
|     @Autowired |     @Autowired | ||||||
|     private OpensyncExternalIntegrationInterface extIntegrationInterface; |     private OpensyncExternalIntegrationInterface extIntegrationInterface; | ||||||
| @@ -85,8 +82,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | |||||||
|  |  | ||||||
|     public OpensyncMqttClient(@Autowired io.netty.handler.ssl.SslContext sslContext, |     public OpensyncMqttClient(@Autowired io.netty.handler.ssl.SslContext sslContext, | ||||||
|             @Value("${tip.wlan.mqttBroker.address.internal:testportal.123wlan.com}") String mqttBrokerAddress, |             @Value("${tip.wlan.mqttBroker.address.internal:testportal.123wlan.com}") String mqttBrokerAddress, | ||||||
|             @Value("${tip.wlan.mqttBroker.listenPort:1883}") int mqttBrokerListenPort, |             @Value("${tip.wlan.mqttBroker.listenPort:1883}") int mqttBrokerListenPort, @Value("${tip.wlan.mqttBroker.user:admin}") String username, | ||||||
|             @Value("${tip.wlan.mqttBroker.user:admin}") String username, |  | ||||||
|             @Value("${tip.wlan.mqttBroker.password:admin}") String password, |             @Value("${tip.wlan.mqttBroker.password:admin}") String password, | ||||||
|             @Value("${mqtt.javax.net.ssl.keyStore:/opt/tip-wlan/certs/client_keystore.jks}") String jdkKeyStoreLocation, |             @Value("${mqtt.javax.net.ssl.keyStore:/opt/tip-wlan/certs/client_keystore.jks}") String jdkKeyStoreLocation, | ||||||
|             @Value("${mqtt.javax.net.ssl.keyStorePassword:mypassword}") String jdkKeyStorePassword, |             @Value("${mqtt.javax.net.ssl.keyStorePassword:mypassword}") String jdkKeyStorePassword, | ||||||
| @@ -114,7 +110,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | |||||||
|             @Override |             @Override | ||||||
|             public void run() { |             public void run() { | ||||||
|                 while (keepReconnecting) { |                 while (keepReconnecting) { | ||||||
|                     FutureConnection futureConnection = null; |                     BlockingConnection blockingConnection = null; | ||||||
|                     try { |                     try { | ||||||
|                         Thread.sleep(5000); |                         Thread.sleep(5000); | ||||||
|  |  | ||||||
| @@ -144,24 +140,46 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | |||||||
|                          */ |                          */ | ||||||
|  |  | ||||||
|                         MQTT mqtt = new MQTT(); |                         MQTT mqtt = new MQTT(); | ||||||
|                         // mqtt.setHost("tcp://192.168.0.137:61616"); |  | ||||||
|                         mqtt.setHost("tls://" + mqttBrokerAddress + ":" + mqttBrokerListenPort); |                         mqtt.setHost("tls://" + mqttBrokerAddress + ":" + mqttBrokerListenPort); | ||||||
|                         LOG.info("Connecting to MQTT broker at {}", mqtt.getHost()); |                         LOG.info("Connecting to MQTT broker at {}", mqtt.getHost()); | ||||||
|                         mqtt.setClientId("opensync_mqtt"); |                         mqtt.setClientId("opensync_mqtt"); | ||||||
|                         mqtt.setUserName(username); |                         mqtt.setUserName(username); | ||||||
|                         mqtt.setPassword(password); |                         mqtt.setPassword(password); | ||||||
|                         // Note: the following does not work with the |                         mqtt.setTracer(new Tracer() { | ||||||
|                         // serverContext, |                             @Override | ||||||
|                         // it has to be the |                             public void onReceive(MQTTFrame frame) { | ||||||
|                         // clientContext |                                 switch (frame.messageType()) { | ||||||
|                         // mqtt.setSslContext(((JdkSslContext) |                                     case PINGREQ.TYPE: | ||||||
|                         // sslContext).context()); |                                     case PINGRESP.TYPE: | ||||||
|                         // For now we'll rely on regular SSLContext from the JDK |                                         // PINGs don't want to fill log | ||||||
|  |                                         LOG.trace("MQTT Client Received: {}", frame); | ||||||
|  |                                         break; | ||||||
|  |                                     default: | ||||||
|  |                                         LOG.debug("MQTT Client Received: {}", frame); | ||||||
|  |                                 } | ||||||
|  |                             } | ||||||
|  |  | ||||||
|                         // TODO: revisit this blocking connection, change it to |                             @Override | ||||||
|                         // futureConnection |                             public void onSend(MQTTFrame frame) { | ||||||
|                         futureConnection = mqtt.futureConnection(); |                                 switch (frame.messageType()) { | ||||||
|                         futureConnection.connect(); |                                     case PINGREQ.TYPE: | ||||||
|  |                                     case PINGRESP.TYPE: | ||||||
|  |                                         // PINGs don't want to fill log | ||||||
|  |                                         LOG.trace("MQTT Client Sent: {}", frame); | ||||||
|  |                                         break; | ||||||
|  |                                     default: | ||||||
|  |                                         LOG.debug("MQTT Client Sent: {}", frame); | ||||||
|  |                                 } | ||||||
|  |                             } | ||||||
|  |  | ||||||
|  |                             @Override | ||||||
|  |                             public void debug(String message, Object... args) { | ||||||
|  |                                 LOG.debug("MQTT Client debugging messages: {}", String.format(message, args)); | ||||||
|  |                             } | ||||||
|  |                         }); | ||||||
|  |  | ||||||
|  |                         blockingConnection = mqtt.blockingConnection(); | ||||||
|  |                         blockingConnection.connect(); | ||||||
|  |  | ||||||
|                         LOG.info("Connected to MQTT broker at {}", mqtt.getHost()); |                         LOG.info("Connected to MQTT broker at {}", mqtt.getHost()); | ||||||
|  |  | ||||||
| @@ -171,24 +189,22 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | |||||||
|                         // new Topic("#", QoS.AT_LEAST_ONCE), |                         // new Topic("#", QoS.AT_LEAST_ONCE), | ||||||
|                         // new Topic("test/#", QoS.EXACTLY_ONCE), |                         // new Topic("test/#", QoS.EXACTLY_ONCE), | ||||||
|                         // new Topic("foo/+/bar", QoS.AT_LEAST_ONCE) |                         // new Topic("foo/+/bar", QoS.AT_LEAST_ONCE) | ||||||
|                         Topic[] topics = { new Topic("#", QoS.AT_LEAST_ONCE), }; |                         Topic[] topics = {new Topic("#", QoS.AT_LEAST_ONCE),}; | ||||||
|  |  | ||||||
|                         futureConnection.subscribe(topics); |                         blockingConnection.subscribe(topics); | ||||||
|                         LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics)); |                         LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics)); | ||||||
|  |  | ||||||
|                         // prepare a JSONPrinter to format protobuf messages as |                         // prepare a JSONPrinter to format protobuf messages as | ||||||
|                         // json |                         // json | ||||||
|                         List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); |                         List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); | ||||||
|                         protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes()); |                         protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes()); | ||||||
|                         protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes()); |  | ||||||
|                         protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes()); |  | ||||||
|                         TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); |                         TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); | ||||||
|                         JsonFormat.Printer jsonPrinter = JsonFormat.printer().includingDefaultValueFields() |                         JsonFormat.Printer jsonPrinter = | ||||||
|                                 .omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry); |                                 JsonFormat.printer().includingDefaultValueFields().omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry); | ||||||
|  |  | ||||||
|                         // main loop - receive messages |                         // main loop - receive messages | ||||||
|                         while (true) { |                         while (true) { | ||||||
|                             Message mqttMsg = futureConnection.receive().await(); |                             Message mqttMsg = blockingConnection.receive(); | ||||||
|  |  | ||||||
|                             if (mqttMsg == null) { |                             if (mqttMsg == null) { | ||||||
|                                 continue; |                                 continue; | ||||||
| @@ -197,23 +213,11 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | |||||||
|                             LOG.debug("MQTT Topic {}", mqttMsg.getTopic()); |                             LOG.debug("MQTT Topic {}", mqttMsg.getTopic()); | ||||||
|  |  | ||||||
|                             byte payload[] = mqttMsg.getPayload(); |                             byte payload[] = mqttMsg.getPayload(); | ||||||
|                             // we acknowledge right after receive because: |  | ||||||
|                             // a. none of the stats messages are so important |  | ||||||
|                             // that |  | ||||||
|                             // we cannot skip one |  | ||||||
|                             // b. if there's some kind of problem with the |  | ||||||
|                             // message |  | ||||||
|                             // (decoding or processing) |  | ||||||
|                             // - we want to move on as quickly as possible and |  | ||||||
|                             // not |  | ||||||
|                             // let it get stuck in the |  | ||||||
|                             // queue |  | ||||||
|                             mqttMsg.ack(); |  | ||||||
|  |  | ||||||
|                             messagesReceived.increment(); |                             messagesReceived.increment(); | ||||||
|                             messageBytesReceived.increment(payload.length); |                             messageBytesReceived.increment(payload.length); | ||||||
|                             Stopwatch stopwatchTimerMessageProcess = timerMessageProcess.start(); |                             Stopwatch stopwatchTimerMessageProcess = timerMessageProcess.start(); | ||||||
|                              |  | ||||||
|                             LOG.trace("received message on topic {} size {}", mqttMsg.getTopic(), payload.length); |                             LOG.trace("received message on topic {} size {}", mqttMsg.getTopic(), payload.length); | ||||||
|  |  | ||||||
|                             if (payload[0] == 0x78) { |                             if (payload[0] == 0x78) { | ||||||
| @@ -223,60 +227,27 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven | |||||||
|                                 payload = ZlibUtil.decompress(payload); |                                 payload = ZlibUtil.decompress(payload); | ||||||
|                             } |                             } | ||||||
|  |  | ||||||
|  |  | ||||||
|                             // attempt to parse the message as protobuf |                             // attempt to parse the message as protobuf | ||||||
|                             MessageOrBuilder encodedMsg = null; |                             MessageOrBuilder encodedMsg = null; | ||||||
|                             try { |                             try { | ||||||
|  |                                 // Only supported protobuf on the TIP opensync APs is Report | ||||||
|                                 encodedMsg = Report.parseFrom(payload); |                                 encodedMsg = Report.parseFrom(payload); | ||||||
|  |                                 mqttMsg.ack(); | ||||||
|                                 MQTT_LOG.info("topic = {} Report = {}", mqttMsg.getTopic(), |                                 MQTT_LOG.info("topic = {}\nReport = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg)); | ||||||
|                                         jsonPrinter.print(encodedMsg)); |  | ||||||
|  |  | ||||||
|                                 extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg); |                                 extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg); | ||||||
|  |  | ||||||
|                             } catch (Exception e) { |                             } catch (Exception e) { | ||||||
|                                 try { |                                 String msgStr = new String(mqttMsg.getPayload(), utf8); | ||||||
|                                     // not a opensync_stats report, attempt to |                                 LOG.warn("Could not process message topic = {}\nmessage = {}", mqttMsg.getTopic(), msgStr); | ||||||
|                                     // deserialize as network_metadata |  | ||||||
|                                     encodedMsg = FlowReport.parseFrom(payload); |  | ||||||
|  |  | ||||||
|                                     MQTT_LOG.info("topic = {} FlowReport = {}", mqttMsg.getTopic(), |  | ||||||
|                                             jsonPrinter.print(encodedMsg)); |  | ||||||
|  |  | ||||||
|                                     extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), |  | ||||||
|                                             (FlowReport) encodedMsg); |  | ||||||
|                                 } catch (Exception e1) { |  | ||||||
|  |  | ||||||
|                                     try { |  | ||||||
|                                         // not a opensync_stats report and not |  | ||||||
|                                         // network_metadata report, attempt to |  | ||||||
|                                         // deserialize as WCStatsReport |  | ||||||
|                                         encodedMsg = WCStatsReport.parseFrom(payload); |  | ||||||
|  |  | ||||||
|                                         MQTT_LOG.info("topic = {} IpDnsTelemetry = {}", mqttMsg.getTopic(), |  | ||||||
|                                                 jsonPrinter.print(encodedMsg)); |  | ||||||
|  |  | ||||||
|  |  | ||||||
|                                         extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), |  | ||||||
|                                                 (WCStatsReport) encodedMsg); |  | ||||||
|                                     } catch (Exception e2) { |  | ||||||
|                                         String msgStr = new String(mqttMsg.getPayload(), utf8); |  | ||||||
|                                         MQTT_LOG.info("topic = {} message = {}", mqttMsg.getTopic(), msgStr); |  | ||||||
|                                     } |  | ||||||
|                                 } |  | ||||||
|                             } finally { |                             } finally { | ||||||
|                                 stopwatchTimerMessageProcess.stop(); |                                 stopwatchTimerMessageProcess.stop(); | ||||||
|                             } |                             } | ||||||
|  |  | ||||||
|                         } |                         } | ||||||
|  |  | ||||||
|                     } catch (Exception e) { |                     } catch (Exception e) { | ||||||
|                         LOG.error("Exception in MQTT receiver", e); |                         LOG.error("Exception in MQTT receiver", e); | ||||||
|                     } finally { |                     } finally { | ||||||
|                         try { |                         try { | ||||||
|                             if (futureConnection != null) { |                             if (blockingConnection != null) { | ||||||
|                                 futureConnection.disconnect(); |                                 blockingConnection.disconnect(); | ||||||
|                             } |                             } | ||||||
|                         } catch (Exception e1) { |                         } catch (Exception e1) { | ||||||
|                             // do nothing |                             // do nothing | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Mike Hansen
					Mike Hansen