mirror of
				https://github.com/Telecominfraproject/wlan-cloud-opensync-controller.git
				synced 2025-10-31 10:37:51 +00:00 
			
		
		
		
	Migrate to OVSDB 2.0 protos, placeholder in Dao for steering and capacity reports
This commit is contained in:
		| @@ -88,17 +88,17 @@ import com.telecominfraproject.wlan.status.models.StatusDataType; | ||||
| import com.telecominfraproject.wlan.status.network.models.NetworkAdminStatusData; | ||||
| import com.telecominfraproject.wlan.status.network.models.NetworkAggregateStatusData; | ||||
|  | ||||
| import sts.PlumeStats.Client; | ||||
| import sts.PlumeStats.ClientReport; | ||||
| import sts.PlumeStats.Device; | ||||
| import sts.PlumeStats.Device.RadioTemp; | ||||
| import sts.PlumeStats.Neighbor; | ||||
| import sts.PlumeStats.Neighbor.NeighborBss; | ||||
| import sts.PlumeStats.RadioBandType; | ||||
| import sts.PlumeStats.Report; | ||||
| import sts.PlumeStats.Survey; | ||||
| import sts.PlumeStats.Survey.SurveySample; | ||||
| import sts.PlumeStats.SurveyType; | ||||
| import sts.OpensyncStats.Client; | ||||
| import sts.OpensyncStats.ClientReport; | ||||
| import sts.OpensyncStats.Device; | ||||
| import sts.OpensyncStats.Device.RadioTemp; | ||||
| import sts.OpensyncStats.Neighbor; | ||||
| import sts.OpensyncStats.Neighbor.NeighborBss; | ||||
| import sts.OpensyncStats.RadioBandType; | ||||
| import sts.OpensyncStats.Report; | ||||
| import sts.OpensyncStats.Survey; | ||||
| import sts.OpensyncStats.Survey.SurveySample; | ||||
| import sts.OpensyncStats.SurveyType; | ||||
| import traffic.NetworkMetadata.FlowReport; | ||||
| import wc.stats.IpDnsTelemetry.WCStatsReport; | ||||
|  | ||||
| @@ -926,13 +926,16 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
| 				nr.setSsid(nBss.getSsid()); | ||||
| 			} | ||||
|  | ||||
| 			LOG.debug("populateNeighbourScanReports created report {} from stats {}", neighbourScanReports, neighbor); | ||||
| 			if (LOG.isDebugEnabled()) { | ||||
| 				LOG.debug("populateNeighbourScanReports created report {} from stats {}", neighbourScanReports, | ||||
| 						neighbor); | ||||
| 			} | ||||
|  | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	private void handleClientSessionUpdate(int customerId, long equipmentId, String apId, long locationId, int channel, | ||||
| 			RadioBandType band, long timestamp, sts.PlumeStats.Client client) { | ||||
| 			RadioBandType band, long timestamp, sts.OpensyncStats.Client client) { | ||||
|  | ||||
| 		com.telecominfraproject.wlan.client.models.Client clientInstance = clientServiceInterface.getOrNull(customerId, | ||||
| 				new MacAddress(client.getMacAddress())); | ||||
| @@ -1007,8 +1010,7 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
| 			clientSession.getDetails().setMetricDetails(metricDetails); | ||||
|  | ||||
| 			clientSession = clientServiceInterface.updateSession(clientSession); | ||||
| 			if (clientSession != null) | ||||
| 				LOG.debug("CreatedOrUpdated clientSession {}", clientSession); | ||||
| 			LOG.debug("CreatedOrUpdated clientSession {}", clientSession); | ||||
|  | ||||
| 		} catch (Exception e) { | ||||
| 			LOG.error("Error while attempting to create ClientSession and Info", e); | ||||
| @@ -1038,7 +1040,7 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
| 			if (smr.getCreatedTimestamp() < clientReport.getTimestampMs()) { | ||||
| 				smr.setCreatedTimestamp(clientReport.getTimestampMs()); | ||||
| 			} | ||||
| 			 | ||||
|  | ||||
| 			long txBytes = 0; | ||||
| 			long rxBytes = 0; | ||||
| 			int txErrors = 0; | ||||
| @@ -1109,7 +1111,9 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
|  | ||||
| 		} | ||||
|  | ||||
| 		LOG.debug("Created ApSsidMetrics Report {}", apSsidMetrics); | ||||
| 		LOG.debug("ApSsidMetrics {}", apSsidMetrics); | ||||
|  | ||||
| 		// LOG.debug("Created ApSsidMetrics Report {}", apSsidMetrics); | ||||
|  | ||||
| 	} | ||||
|  | ||||
| @@ -1199,6 +1203,7 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
| 			} | ||||
|  | ||||
| 		} | ||||
|  | ||||
| 		LOG.debug("ChannelInfoReports {}", channelInfoReports); | ||||
|  | ||||
| 	} | ||||
| @@ -1215,7 +1220,6 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
| 			busySelf += sample.getBusySelf(); | ||||
| 			busy += sample.getBusy(); | ||||
| 			channelInfo.setChanNumber(sample.getChannel()); | ||||
| 			LOG.debug("MJH Channel {} Sample for Radio {}", sample.getChannel(), radioType); | ||||
| 		} | ||||
|  | ||||
| 		int iBSS = busyTx + busySelf; | ||||
| @@ -1329,7 +1333,7 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra | ||||
| 				continue; | ||||
| 			} | ||||
|  | ||||
| 			if (radioState.getAllowedChannels() != null && !radioState.getAllowedChannels().isEmpty()) { | ||||
| 			if (radioState.getAllowedChannels() != null) { | ||||
| 				apElementConfiguration = ((ApElementConfiguration) ce.getDetails()); | ||||
| 				apElementConfiguration.getRadioMap().get(radioState.getFreqBand()) | ||||
| 						.setAllowedChannels(new ArrayList<>(radioState.getAllowedChannels())); | ||||
|   | ||||
| @@ -10,7 +10,7 @@ import com.telecominfraproject.wlan.opensync.external.integration.models.Opensyn | ||||
| import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncAWLANNode; | ||||
| import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncWifiAssociatedClients; | ||||
|  | ||||
| import sts.PlumeStats.Report; | ||||
| import sts.OpensyncStats.Report; | ||||
| import traffic.NetworkMetadata.FlowReport; | ||||
| import wc.stats.IpDnsTelemetry.WCStatsReport; | ||||
|  | ||||
|   | ||||
| @@ -21,7 +21,7 @@ import com.telecominfraproject.wlan.opensync.external.integration.models.Opensyn | ||||
| import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncAWLANNode; | ||||
| import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncWifiAssociatedClients; | ||||
|  | ||||
| import sts.PlumeStats.Report; | ||||
| import sts.OpensyncStats.Report; | ||||
| import traffic.NetworkMetadata.FlowReport; | ||||
| import wc.stats.IpDnsTelemetry.WCStatsReport; | ||||
|  | ||||
|   | ||||
| @@ -27,215 +27,207 @@ import com.google.protobuf.util.JsonFormat.TypeRegistry; | ||||
| import com.telecominfraproject.wlan.opensync.external.integration.OpensyncExternalIntegrationInterface; | ||||
| import com.telecominfraproject.wlan.opensync.util.ZlibUtil; | ||||
|  | ||||
| import sts.PlumeStats; | ||||
| import sts.PlumeStats.Report; | ||||
| import sts.OpensyncStats; | ||||
| import sts.OpensyncStats.Report; | ||||
| import traffic.NetworkMetadata; | ||||
| import traffic.NetworkMetadata.FlowReport; | ||||
| import wc.stats.IpDnsTelemetry; | ||||
| import wc.stats.IpDnsTelemetry.WCStatsReport;  | ||||
| import wc.stats.IpDnsTelemetry.WCStatsReport; | ||||
|  | ||||
| @Profile("mqtt_receiver") | ||||
| @Component | ||||
| public class OpensyncMqttClient implements ApplicationListener<ContextClosedEvent> { | ||||
|  | ||||
|     private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class); | ||||
|      | ||||
|     private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA"); | ||||
| 	private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class); | ||||
|  | ||||
|     public static Charset utf8 = Charset.forName("UTF-8"); | ||||
|      | ||||
|     @Autowired | ||||
|     private OpensyncExternalIntegrationInterface extIntegrationInterface; | ||||
| 	private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA"); | ||||
|  | ||||
| 	public static Charset utf8 = Charset.forName("UTF-8"); | ||||
|  | ||||
|     // | ||||
|     // See https://github.com/fusesource/mqtt-client for the docs | ||||
|     // | ||||
|      | ||||
|     private boolean keepReconnecting = true; | ||||
|     private Thread mqttClientThread; | ||||
|      | ||||
|     public OpensyncMqttClient( | ||||
|             @Autowired io.netty.handler.ssl.SslContext sslContext, | ||||
|             @Value("${connectus.mqttBroker.address:testportal.123wlan.com}") | ||||
|             String mqttBrokerAddress,             | ||||
|             @Value("${connectus.mqttBroker.listenPort:1883}") | ||||
|             int mqttBrokerListenPort, | ||||
|             @Value("${connectus.mqttBroker.user:admin}") | ||||
|             String username, | ||||
|             @Value("${connectus.mqttBroker.password:admin}") | ||||
|             String password, | ||||
|             @Value("${mqtt.javax.net.ssl.keyStore:/opt/tip-wlan/certs/client_keystore.jks}") | ||||
|             String jdkKeyStoreLocation, | ||||
|             @Value("${mqtt.javax.net.ssl.keyStorePassword:mypassword}") | ||||
|             String jdkKeyStorePassword,             | ||||
|             @Value("${mqtt.javax.net.ssl.trustStore:/opt/tip-wlan/certs/truststore.jks}") | ||||
|             String jdkTrustStoreLocation, | ||||
|             @Value("${mqtt.javax.net.ssl.trustStorePassword:mypassword}")            | ||||
|             String jdkTrustStorePassword | ||||
|             ){ | ||||
|          | ||||
|     	if(System.getProperty("javax.net.ssl.keyStore") == null) { | ||||
|     		System.setProperty("javax.net.ssl.keyStore", jdkKeyStoreLocation); | ||||
|     	} | ||||
| 	@Autowired | ||||
| 	private OpensyncExternalIntegrationInterface extIntegrationInterface; | ||||
|  | ||||
|     	if(System.getProperty("javax.net.ssl.keyStorePassword") == null) { | ||||
|     		System.setProperty("javax.net.ssl.keyStorePassword", jdkKeyStorePassword); | ||||
|     	} | ||||
| 	// | ||||
| 	// See https://github.com/fusesource/mqtt-client for the docs | ||||
| 	// | ||||
|  | ||||
|     	if(System.getProperty("javax.net.ssl.trustStore") == null) { | ||||
|     		System.setProperty("javax.net.ssl.trustStore", jdkTrustStoreLocation); | ||||
|     	} | ||||
| 	private boolean keepReconnecting = true; | ||||
| 	private Thread mqttClientThread; | ||||
|  | ||||
|     	if(System.getProperty("javax.net.ssl.trustStorePassword") == null) { | ||||
|     		System.setProperty("javax.net.ssl.trustStorePassword", jdkTrustStorePassword); | ||||
|     	} | ||||
| 	public OpensyncMqttClient(@Autowired io.netty.handler.ssl.SslContext sslContext, | ||||
| 			@Value("${connectus.mqttBroker.address:testportal.123wlan.com}") String mqttBrokerAddress, | ||||
| 			@Value("${connectus.mqttBroker.listenPort:1883}") int mqttBrokerListenPort, | ||||
| 			@Value("${connectus.mqttBroker.user:admin}") String username, | ||||
| 			@Value("${connectus.mqttBroker.password:admin}") String password, | ||||
| 			@Value("${mqtt.javax.net.ssl.keyStore:/opt/tip-wlan/certs/client_keystore.jks}") String jdkKeyStoreLocation, | ||||
| 			@Value("${mqtt.javax.net.ssl.keyStorePassword:mypassword}") String jdkKeyStorePassword, | ||||
| 			@Value("${mqtt.javax.net.ssl.trustStore:/opt/tip-wlan/certs/truststore.jks}") String jdkTrustStoreLocation, | ||||
| 			@Value("${mqtt.javax.net.ssl.trustStorePassword:mypassword}") String jdkTrustStorePassword) { | ||||
|  | ||||
|         Runnable mqttClientRunnable = () -> { | ||||
|             while(keepReconnecting) { | ||||
|                 BlockingConnection connection = null; | ||||
|                 try { | ||||
|                     Thread.sleep(5000); | ||||
|                      | ||||
|                     // Create a new MQTT connection to the broker.   | ||||
|                     /* | ||||
|                      * Using SSL connections | ||||
|                      * If you want to connect over SSL/TLS instead of | ||||
|                      * TCP, use an "ssl://" or "tls://" URI prefix instead of "tcp://" for | ||||
|                      * the host field. | ||||
|                      * Supported protocol values are: | ||||
|                      *  | ||||
|                      * ssl:// - Use the JVM default version of the SSL algorithm.  | ||||
|                      * sslv*:// - Use a specific SSL version where * is a version supported by your JVM. Example: sslv3  | ||||
|                      * tls:// - Use the JVM default version of the TLS algorithm.  | ||||
|                      * tlsv*:// - Use a specific TLS version where * is a version supported by your JVM. Example: tlsv1.1  | ||||
|                      * The client will use the | ||||
|                      * default JVM SSLContext which is configured via JVM system properties | ||||
|                      * unless you configure the MQTT instance using the setSslContext method. | ||||
|                      *  | ||||
|                      * SSL connections perform blocking operations against internal thread | ||||
|                      * pool unless you call the setBlockingExecutor method to configure that | ||||
|                      * executor they will use instead. | ||||
|                      *  | ||||
|                      */ | ||||
| 		if (System.getProperty("javax.net.ssl.keyStore") == null) { | ||||
| 			System.setProperty("javax.net.ssl.keyStore", jdkKeyStoreLocation); | ||||
| 		} | ||||
|  | ||||
|                     MQTT mqtt = new MQTT(); | ||||
|                     //mqtt.setHost("tcp://192.168.0.137:61616"); | ||||
|                     mqtt.setHost("tls://"+mqttBrokerAddress+":"+mqttBrokerListenPort); | ||||
|                     LOG.info("Connecting to MQTT broker at {}", mqtt.getHost()); | ||||
|                     mqtt.setClientId("opensync_mqtt"); | ||||
|                     mqtt.setUserName(username); | ||||
|                     mqtt.setPassword(password); | ||||
|                     //Note: the following does not work with the serverContext, it has to be the clientContext | ||||
|                     //mqtt.setSslContext(((JdkSslContext) sslContext).context()); | ||||
|                     //For now we'll rely on regular SSLContext from the JDK | ||||
|                      | ||||
|                     //TODO: revisit this blocking connection, change it to futureConnection | ||||
|                     connection = mqtt.blockingConnection(); | ||||
|                     connection.connect(); | ||||
|                     LOG.info("Connected to MQTT broker at {}", mqtt.getHost()); | ||||
|              | ||||
|                     // Subscribe to topics: | ||||
|                     // | ||||
|                     //      new Topic("mqtt/example/publish", QoS.AT_LEAST_ONCE),  | ||||
|                     //      new Topic("#", QoS.AT_LEAST_ONCE), | ||||
|                     //      new Topic("test/#", QoS.EXACTLY_ONCE), | ||||
|                     //      new Topic("foo/+/bar", QoS.AT_LEAST_ONCE) | ||||
|                     Topic[] topics = {  | ||||
|                             new Topic("#", QoS.AT_LEAST_ONCE), | ||||
|                             }; | ||||
|                      | ||||
|                     connection.subscribe(topics); | ||||
|                     LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics)); | ||||
|              | ||||
|                     //prepare a JSONPrinter to format protobuf messages as json | ||||
|                     List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); | ||||
|                     protobufDescriptors.addAll(PlumeStats.getDescriptor().getMessageTypes()); | ||||
|                     protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes()); | ||||
|                     protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes()); | ||||
|                     TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); | ||||
|                     JsonFormat.Printer jsonPrinter = JsonFormat.printer().includingDefaultValueFields().omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry ); | ||||
|                      | ||||
|                     //main loop - receive messages | ||||
|                     while(true) { | ||||
|                         Message mqttMsg = connection.receive(5, TimeUnit.SECONDS); | ||||
|                          | ||||
|                         if(mqttMsg == null) { | ||||
|                             continue; | ||||
|                         } | ||||
|                          | ||||
|                         byte payload[] = mqttMsg.getPayload(); | ||||
|                         //we acknowledge right after receive because: | ||||
|                         // a. none of the stats messages are so important that we cannot skip one | ||||
|                         // b. if there's some kind of problem with the message (decoding or processing) - we want to move on as quickly as possible and not let it get stuck in the queue | ||||
|                         mqttMsg.ack(); | ||||
|                                  | ||||
|                         LOG.trace("received message on topic {} size {}", mqttMsg.getTopic(), payload.length); | ||||
|          | ||||
|                         if(payload[0]==0x78) { | ||||
|                             //looks like zlib-compressed data, let's decompress it before deserializing | ||||
|                             payload = ZlibUtil.decompress(payload); | ||||
|                         } | ||||
| 		if (System.getProperty("javax.net.ssl.keyStorePassword") == null) { | ||||
| 			System.setProperty("javax.net.ssl.keyStorePassword", jdkKeyStorePassword); | ||||
| 		} | ||||
|  | ||||
|                         //attempt to parse the message as protobuf | ||||
|                         MessageOrBuilder encodedMsg = null; | ||||
|                         try { | ||||
| 		if (System.getProperty("javax.net.ssl.trustStore") == null) { | ||||
| 			System.setProperty("javax.net.ssl.trustStore", jdkTrustStoreLocation); | ||||
| 		} | ||||
|  | ||||
|                             encodedMsg = Report.parseFrom(payload); | ||||
|                             MQTT_LOG.info("topic = {} Report = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg)); | ||||
|                             extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg); | ||||
| 		if (System.getProperty("javax.net.ssl.trustStorePassword") == null) { | ||||
| 			System.setProperty("javax.net.ssl.trustStorePassword", jdkTrustStorePassword); | ||||
| 		} | ||||
|  | ||||
|                         }catch(Exception e) { | ||||
|                             try { | ||||
|                                 //not a plume_stats report, attempt to deserialize as network_metadata | ||||
|                                 encodedMsg = FlowReport.parseFrom(payload); | ||||
|                                 MQTT_LOG.info("topic = {} FlowReport = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg)); | ||||
|                                 extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (FlowReport) encodedMsg); | ||||
|                             }catch(Exception e1) { | ||||
|                                  | ||||
|                                 try { | ||||
|                                     //not a plume_stats report and not network_metadata report, attempt to deserialize as WCStatsReport | ||||
|                                     encodedMsg = WCStatsReport.parseFrom(payload); | ||||
|                                     MQTT_LOG.info("topic = {} IpDnsTelemetry = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg)); | ||||
|                                     extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (WCStatsReport) encodedMsg); | ||||
|                                 }catch(Exception e2) { | ||||
|                                     String msgStr = new String(mqttMsg.getPayload(), utf8); | ||||
|                                     MQTT_LOG.info("topic = {} message = {}", mqttMsg.getTopic(), msgStr); | ||||
|                                 } | ||||
|                             } | ||||
|                         } | ||||
|                              | ||||
|                     } | ||||
|                                          | ||||
|                 } catch(Exception e) { | ||||
|                     LOG.error("Exception in MQTT receiver", e); | ||||
|                 } finally { | ||||
|                     try { | ||||
|                         if(connection!=null) { | ||||
|                             connection.disconnect(); | ||||
|                         } | ||||
|                     } catch(Exception e1) { | ||||
|                         //do nothing | ||||
|                     }                     | ||||
|                 } | ||||
|             } | ||||
|                  | ||||
|         }; | ||||
| 		Runnable mqttClientRunnable = () -> { | ||||
| 			while (keepReconnecting) { | ||||
| 				BlockingConnection connection = null; | ||||
| 				try { | ||||
| 					Thread.sleep(5000); | ||||
|  | ||||
|         mqttClientThread = new Thread(mqttClientRunnable, "mqttClientThread"); | ||||
|         mqttClientThread.setDaemon(true); | ||||
|         mqttClientThread.start();         | ||||
| 					// Create a new MQTT connection to the broker. | ||||
| 					/* | ||||
| 					 * Using SSL connections If you want to connect over SSL/TLS instead of TCP, use | ||||
| 					 * an "ssl://" or "tls://" URI prefix instead of "tcp://" for the host field. | ||||
| 					 * Supported protocol values are: | ||||
| 					 *  | ||||
| 					 * ssl:// - Use the JVM default version of the SSL algorithm. sslv*:// - Use a | ||||
| 					 * specific SSL version where * is a version supported by your JVM. Example: | ||||
| 					 * sslv3 tls:// - Use the JVM default version of the TLS algorithm. tlsv*:// - | ||||
| 					 * Use a specific TLS version where * is a version supported by your JVM. | ||||
| 					 * Example: tlsv1.1 The client will use the default JVM SSLContext which is | ||||
| 					 * configured via JVM system properties unless you configure the MQTT instance | ||||
| 					 * using the setSslContext method. | ||||
| 					 *  | ||||
| 					 * SSL connections perform blocking operations against internal thread pool | ||||
| 					 * unless you call the setBlockingExecutor method to configure that executor | ||||
| 					 * they will use instead. | ||||
| 					 *  | ||||
| 					 */ | ||||
|  | ||||
|     } | ||||
| 					MQTT mqtt = new MQTT(); | ||||
| 					// mqtt.setHost("tcp://192.168.0.137:61616"); | ||||
| 					mqtt.setHost("tls://" + mqttBrokerAddress + ":" + mqttBrokerListenPort); | ||||
| 					LOG.info("Connecting to MQTT broker at {}", mqtt.getHost()); | ||||
| 					mqtt.setClientId("opensync_mqtt"); | ||||
| 					mqtt.setUserName(username); | ||||
| 					mqtt.setPassword(password); | ||||
| 					// Note: the following does not work with the serverContext, it has to be the | ||||
| 					// clientContext | ||||
| 					// mqtt.setSslContext(((JdkSslContext) sslContext).context()); | ||||
| 					// For now we'll rely on regular SSLContext from the JDK | ||||
|  | ||||
| 					// TODO: revisit this blocking connection, change it to futureConnection | ||||
| 					connection = mqtt.blockingConnection(); | ||||
| 					connection.connect(); | ||||
| 					LOG.info("Connected to MQTT broker at {}", mqtt.getHost()); | ||||
|  | ||||
|     @Override | ||||
|     public void onApplicationEvent(ContextClosedEvent event) { | ||||
|         LOG.debug("Processing ContextClosedEvent event"); | ||||
|         keepReconnecting = false; | ||||
|          | ||||
|         if(mqttClientThread!=null) { | ||||
|             mqttClientThread.interrupt(); | ||||
|         } | ||||
|     } | ||||
| 					// Subscribe to topics: | ||||
| 					// | ||||
| 					// new Topic("mqtt/example/publish", QoS.AT_LEAST_ONCE), | ||||
| 					// new Topic("#", QoS.AT_LEAST_ONCE), | ||||
| 					// new Topic("test/#", QoS.EXACTLY_ONCE), | ||||
| 					// new Topic("foo/+/bar", QoS.AT_LEAST_ONCE) | ||||
| 					Topic[] topics = { new Topic("#", QoS.AT_LEAST_ONCE), }; | ||||
|  | ||||
| 					connection.subscribe(topics); | ||||
| 					LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics)); | ||||
|  | ||||
| 					// prepare a JSONPrinter to format protobuf messages as json | ||||
| 					List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>(); | ||||
| 					protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes()); | ||||
| 					protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes()); | ||||
| 					protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes()); | ||||
| 					TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build(); | ||||
| 					JsonFormat.Printer jsonPrinter = JsonFormat.printer().includingDefaultValueFields() | ||||
| 							.omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry); | ||||
|  | ||||
| 					// main loop - receive messages | ||||
| 					while (true) { | ||||
| 						Message mqttMsg = connection.receive(5, TimeUnit.SECONDS); | ||||
|  | ||||
| 						if (mqttMsg == null) { | ||||
| 							continue; | ||||
| 						} | ||||
|  | ||||
| 						byte payload[] = mqttMsg.getPayload(); | ||||
| 						// we acknowledge right after receive because: | ||||
| 						// a. none of the stats messages are so important that we cannot skip one | ||||
| 						// b. if there's some kind of problem with the message (decoding or processing) | ||||
| 						// - we want to move on as quickly as possible and not let it get stuck in the | ||||
| 						// queue | ||||
| 						mqttMsg.ack(); | ||||
|  | ||||
| 						LOG.trace("received message on topic {} size {}", mqttMsg.getTopic(), payload.length); | ||||
|  | ||||
| 						if (payload[0] == 0x78) { | ||||
| 							// looks like zlib-compressed data, let's decompress it before deserializing | ||||
| 							payload = ZlibUtil.decompress(payload); | ||||
| 						} | ||||
|  | ||||
| 						// attempt to parse the message as protobuf | ||||
| 						MessageOrBuilder encodedMsg = null; | ||||
| 						try { | ||||
|  | ||||
| 							encodedMsg = Report.parseFrom(payload); | ||||
| 							MQTT_LOG.info("topic = {} Report = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg)); | ||||
| 							extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg); | ||||
|  | ||||
| 						} catch (Exception e) { | ||||
| 							try { | ||||
| 								// not a plume_stats report, attempt to deserialize as network_metadata | ||||
| 								encodedMsg = FlowReport.parseFrom(payload); | ||||
| 								MQTT_LOG.info("topic = {} FlowReport = {}", mqttMsg.getTopic(), | ||||
| 										jsonPrinter.print(encodedMsg)); | ||||
| 								extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (FlowReport) encodedMsg); | ||||
| 							} catch (Exception e1) { | ||||
|  | ||||
| 								try { | ||||
| 									// not a plume_stats report and not network_metadata report, attempt to | ||||
| 									// deserialize as WCStatsReport | ||||
| 									encodedMsg = WCStatsReport.parseFrom(payload); | ||||
| 									MQTT_LOG.info("topic = {} IpDnsTelemetry = {}", mqttMsg.getTopic(), | ||||
| 											jsonPrinter.print(encodedMsg)); | ||||
| 									extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), | ||||
| 											(WCStatsReport) encodedMsg); | ||||
| 								} catch (Exception e2) { | ||||
| 									String msgStr = new String(mqttMsg.getPayload(), utf8); | ||||
| 									MQTT_LOG.info("topic = {} message = {}", mqttMsg.getTopic(), msgStr); | ||||
| 								} | ||||
| 							} | ||||
| 						} | ||||
|  | ||||
| 					} | ||||
|  | ||||
| 				} catch (Exception e) { | ||||
| 					LOG.error("Exception in MQTT receiver", e); | ||||
| 				} finally { | ||||
| 					try { | ||||
| 						if (connection != null) { | ||||
| 							connection.disconnect(); | ||||
| 						} | ||||
| 					} catch (Exception e1) { | ||||
| 						// do nothing | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 		}; | ||||
|  | ||||
| 		mqttClientThread = new Thread(mqttClientRunnable, "mqttClientThread"); | ||||
| 		mqttClientThread.setDaemon(true); | ||||
| 		mqttClientThread.start(); | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	@Override | ||||
| 	public void onApplicationEvent(ContextClosedEvent event) { | ||||
| 		LOG.debug("Processing ContextClosedEvent event"); | ||||
| 		keepReconnecting = false; | ||||
|  | ||||
| 		if (mqttClientThread != null) { | ||||
| 			mqttClientThread.interrupt(); | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -2309,6 +2309,10 @@ public class OvsdbDao { | ||||
|  | ||||
| 			provisionWifiStatsConfigClient(radioConfigs, getProvisionedWifiStatsConfigs(ovsdbClient), operations); | ||||
|  | ||||
| 			provisionWifiStatsConfigBandSteering(radioConfigs, getProvisionedWifiStatsConfigs(ovsdbClient), operations); | ||||
|  | ||||
| 			provisionWifiStatsConfigCapacity(radioConfigs, getProvisionedWifiStatsConfigs(ovsdbClient), operations); | ||||
|  | ||||
| 			provisionWifiStatsRssi(radioConfigs, getProvisionedWifiStatsConfigs(ovsdbClient), operations); | ||||
|  | ||||
| 			if (!operations.isEmpty()) { | ||||
| @@ -2329,6 +2333,50 @@ public class OvsdbDao { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	private void provisionWifiStatsConfigCapacity(Map<String, WifiRadioConfigInfo> radioConfigs, | ||||
| 			Map<String, WifiStatsConfigInfo> provisionedWifiStatsConfigs, List<Operation> operations) { | ||||
|  | ||||
| 		radioConfigs.values().stream().forEach(rc -> { | ||||
| 			if (!provisionedWifiStatsConfigs.containsKey(rc.freqBand + "_capacity")) { | ||||
| 				// | ||||
| 				Map<String, Value> rowColumns = new HashMap<>(); | ||||
| 				rowColumns.put("radio_type", new Atom<>(rc.freqBand)); | ||||
| 				rowColumns.put("reporting_interval", new Atom<>(60)); | ||||
| //				rowColumns.put("sampling_interval", new Atom<>(3)); | ||||
| 				rowColumns.put("stats_type", new Atom<>("capacity")); | ||||
| //				rowColumns.put("survey_interval_ms", new Atom<>(65)); | ||||
| //				rowColumns.put("survey_type", new Atom<>("on-chan")); | ||||
|  | ||||
| 				Row updateRow = new Row(rowColumns); | ||||
| 				operations.add(new Insert(wifiStatsConfigDbTable, updateRow)); | ||||
|  | ||||
| 			} | ||||
| 		}); | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	private void provisionWifiStatsConfigBandSteering(Map<String, WifiRadioConfigInfo> radioConfigs, | ||||
| 			Map<String, WifiStatsConfigInfo> provisionedWifiStatsConfigs, List<Operation> operations) { | ||||
|  | ||||
| 		radioConfigs.values().stream().forEach(rc -> { | ||||
| 			if (!provisionedWifiStatsConfigs.containsKey(rc.freqBand + "_steering")) { | ||||
| 				// | ||||
| 				Map<String, Value> rowColumns = new HashMap<>(); | ||||
| 				rowColumns.put("radio_type", new Atom<>(rc.freqBand)); | ||||
| 				rowColumns.put("reporting_interval", new Atom<>(60)); | ||||
| //				rowColumns.put("sampling_interval", new Atom<>(3)); | ||||
| 				rowColumns.put("stats_type", new Atom<>("steering")); | ||||
| //				rowColumns.put("survey_interval_ms", new Atom<>(65)); | ||||
| //				rowColumns.put("survey_type", new Atom<>("on-chan")); | ||||
|  | ||||
| 				Row updateRow = new Row(rowColumns); | ||||
| 				operations.add(new Insert(wifiStatsConfigDbTable, updateRow)); | ||||
|  | ||||
| 			} | ||||
| 		}); | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	private void provisionWifiStatsRssi(Map<String, WifiRadioConfigInfo> radioConfigs, | ||||
| 			Map<String, WifiStatsConfigInfo> provisionedWifiStatsConfigs, List<Operation> operations) { | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Mike Hansen
					Mike Hansen