Retrieve Radio State information on initial AP connect

This commit is contained in:
Mike Hansen
2020-06-23 17:46:31 -04:00
parent 2be3559547
commit 82332611ce
6 changed files with 911 additions and 772 deletions

View File

@@ -38,196 +38,221 @@ import wc.stats.IpDnsTelemetry.WCStatsReport;
@Component
public class OpensyncMqttClient implements ApplicationListener<ContextClosedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class);
private static final Logger LOG = LoggerFactory.getLogger(OpensyncMqttClient.class);
private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA");
private static final Logger MQTT_LOG = LoggerFactory.getLogger("MQTT_DATA");
public static Charset utf8 = Charset.forName("UTF-8");
public static Charset utf8 = Charset.forName("UTF-8");
@Autowired
private OpensyncExternalIntegrationInterface extIntegrationInterface;
@Autowired
private OpensyncExternalIntegrationInterface extIntegrationInterface;
//
// See https://github.com/fusesource/mqtt-client for the docs
//
//
// See https://github.com/fusesource/mqtt-client for the docs
//
private boolean keepReconnecting = true;
private Thread mqttClientThread;
private boolean keepReconnecting = true;
private Thread mqttClientThread;
public OpensyncMqttClient(@Autowired io.netty.handler.ssl.SslContext sslContext,
@Value("${connectus.mqttBroker.address:testportal.123wlan.com}") String mqttBrokerAddress,
@Value("${connectus.mqttBroker.listenPort:1883}") int mqttBrokerListenPort,
@Value("${connectus.mqttBroker.user:admin}") String username,
@Value("${connectus.mqttBroker.password:admin}") String password,
@Value("${mqtt.javax.net.ssl.keyStore:/opt/tip-wlan/certs/client_keystore.jks}") String jdkKeyStoreLocation,
@Value("${mqtt.javax.net.ssl.keyStorePassword:mypassword}") String jdkKeyStorePassword,
@Value("${mqtt.javax.net.ssl.trustStore:/opt/tip-wlan/certs/truststore.jks}") String jdkTrustStoreLocation,
@Value("${mqtt.javax.net.ssl.trustStorePassword:mypassword}") String jdkTrustStorePassword) {
public OpensyncMqttClient(@Autowired io.netty.handler.ssl.SslContext sslContext,
@Value("${connectus.mqttBroker.address:testportal.123wlan.com}") String mqttBrokerAddress,
@Value("${connectus.mqttBroker.listenPort:1883}") int mqttBrokerListenPort,
@Value("${connectus.mqttBroker.user:admin}") String username,
@Value("${connectus.mqttBroker.password:admin}") String password,
@Value("${mqtt.javax.net.ssl.keyStore:/opt/tip-wlan/certs/client_keystore.jks}") String jdkKeyStoreLocation,
@Value("${mqtt.javax.net.ssl.keyStorePassword:mypassword}") String jdkKeyStorePassword,
@Value("${mqtt.javax.net.ssl.trustStore:/opt/tip-wlan/certs/truststore.jks}") String jdkTrustStoreLocation,
@Value("${mqtt.javax.net.ssl.trustStorePassword:mypassword}") String jdkTrustStorePassword) {
if (System.getProperty("javax.net.ssl.keyStore") == null) {
System.setProperty("javax.net.ssl.keyStore", jdkKeyStoreLocation);
}
if (System.getProperty("javax.net.ssl.keyStore") == null) {
System.setProperty("javax.net.ssl.keyStore", jdkKeyStoreLocation);
}
if (System.getProperty("javax.net.ssl.keyStorePassword") == null) {
System.setProperty("javax.net.ssl.keyStorePassword", jdkKeyStorePassword);
}
if (System.getProperty("javax.net.ssl.keyStorePassword") == null) {
System.setProperty("javax.net.ssl.keyStorePassword", jdkKeyStorePassword);
}
if (System.getProperty("javax.net.ssl.trustStore") == null) {
System.setProperty("javax.net.ssl.trustStore", jdkTrustStoreLocation);
}
if (System.getProperty("javax.net.ssl.trustStore") == null) {
System.setProperty("javax.net.ssl.trustStore", jdkTrustStoreLocation);
}
if (System.getProperty("javax.net.ssl.trustStorePassword") == null) {
System.setProperty("javax.net.ssl.trustStorePassword", jdkTrustStorePassword);
}
if (System.getProperty("javax.net.ssl.trustStorePassword") == null) {
System.setProperty("javax.net.ssl.trustStorePassword", jdkTrustStorePassword);
}
Runnable mqttClientRunnable = () -> {
while (keepReconnecting) {
BlockingConnection connection = null;
try {
Thread.sleep(5000);
Runnable mqttClientRunnable = new Runnable() {
@Override
public void run() {
while (keepReconnecting) {
BlockingConnection connection = null;
try {
Thread.sleep(5000);
// Create a new MQTT connection to the broker.
/*
* Using SSL connections If you want to connect over SSL/TLS instead of TCP, use
* an "ssl://" or "tls://" URI prefix instead of "tcp://" for the host field.
* Supported protocol values are:
*
* ssl:// - Use the JVM default version of the SSL algorithm. sslv*:// - Use a
* specific SSL version where * is a version supported by your JVM. Example:
* sslv3 tls:// - Use the JVM default version of the TLS algorithm. tlsv*:// -
* Use a specific TLS version where * is a version supported by your JVM.
* Example: tlsv1.1 The client will use the default JVM SSLContext which is
* configured via JVM system properties unless you configure the MQTT instance
* using the setSslContext method.
*
* SSL connections perform blocking operations against internal thread pool
* unless you call the setBlockingExecutor method to configure that executor
* they will use instead.
*
*/
// Create a new MQTT connection to the broker.
/*
* Using SSL connections If you want to connect over
* SSL/TLS instead of TCP, use an "ssl://" or "tls://"
* URI prefix instead of "tcp://" for the host field.
* Supported protocol values are:
*
* ssl:// - Use the JVM default version of the SSL
* algorithm. sslv*:// - Use a specific SSL version
* where * is a version supported by your JVM. Example:
* sslv3 tls:// - Use the JVM default version of the TLS
* algorithm. tlsv*:// - Use a specific TLS version
* where * is a version supported by your JVM. Example:
* tlsv1.1 The client will use the default JVM
* SSLContext which is configured via JVM system
* properties unless you configure the MQTT instance
* using the setSslContext method.
*
* SSL connections perform blocking operations against
* internal thread pool unless you call the
* setBlockingExecutor method to configure that executor
* they will use instead.
*
*/
MQTT mqtt = new MQTT();
// mqtt.setHost("tcp://192.168.0.137:61616");
mqtt.setHost("tls://" + mqttBrokerAddress + ":" + mqttBrokerListenPort);
LOG.info("Connecting to MQTT broker at {}", mqtt.getHost());
mqtt.setClientId("opensync_mqtt");
mqtt.setUserName(username);
mqtt.setPassword(password);
// Note: the following does not work with the serverContext, it has to be the
// clientContext
// mqtt.setSslContext(((JdkSslContext) sslContext).context());
// For now we'll rely on regular SSLContext from the JDK
MQTT mqtt = new MQTT();
// mqtt.setHost("tcp://192.168.0.137:61616");
mqtt.setHost("tls://" + mqttBrokerAddress + ":" + mqttBrokerListenPort);
LOG.info("Connecting to MQTT broker at {}", mqtt.getHost());
mqtt.setClientId("opensync_mqtt");
mqtt.setUserName(username);
mqtt.setPassword(password);
// Note: the following does not work with the
// serverContext,
// it has to be the
// clientContext
// mqtt.setSslContext(((JdkSslContext)
// sslContext).context());
// For now we'll rely on regular SSLContext from the JDK
// TODO: revisit this blocking connection, change it to futureConnection
connection = mqtt.blockingConnection();
connection.connect();
LOG.info("Connected to MQTT broker at {}", mqtt.getHost());
// TODO: revisit this blocking connection, change it to
// futureConnection
connection = mqtt.blockingConnection();
connection.connect();
LOG.info("Connected to MQTT broker at {}", mqtt.getHost());
// Subscribe to topics:
//
// new Topic("mqtt/example/publish", QoS.AT_LEAST_ONCE),
// new Topic("#", QoS.AT_LEAST_ONCE),
// new Topic("test/#", QoS.EXACTLY_ONCE),
// new Topic("foo/+/bar", QoS.AT_LEAST_ONCE)
Topic[] topics = { new Topic("#", QoS.AT_LEAST_ONCE), };
// Subscribe to topics:
//
// new Topic("mqtt/example/publish", QoS.AT_LEAST_ONCE),
// new Topic("#", QoS.AT_LEAST_ONCE),
// new Topic("test/#", QoS.EXACTLY_ONCE),
// new Topic("foo/+/bar", QoS.AT_LEAST_ONCE)
Topic[] topics = { new Topic("#", QoS.AT_LEAST_ONCE), };
connection.subscribe(topics);
LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics));
connection.subscribe(topics);
LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics));
// prepare a JSONPrinter to format protobuf messages as json
List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>();
protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes());
protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes());
protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes());
TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build();
JsonFormat.Printer jsonPrinter = JsonFormat.printer().includingDefaultValueFields()
.omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry);
// prepare a JSONPrinter to format protobuf messages as
// json
List<Descriptors.Descriptor> protobufDescriptors = new ArrayList<>();
protobufDescriptors.addAll(OpensyncStats.getDescriptor().getMessageTypes());
protobufDescriptors.addAll(IpDnsTelemetry.getDescriptor().getMessageTypes());
protobufDescriptors.addAll(NetworkMetadata.getDescriptor().getMessageTypes());
TypeRegistry oldRegistry = TypeRegistry.newBuilder().add(protobufDescriptors).build();
JsonFormat.Printer jsonPrinter = JsonFormat.printer().includingDefaultValueFields()
.omittingInsignificantWhitespace().usingTypeRegistry(oldRegistry);
// main loop - receive messages
while (true) {
Message mqttMsg = connection.receive(5, TimeUnit.SECONDS);
// main loop - receive messages
while (true) {
Message mqttMsg = connection.receive(5, TimeUnit.SECONDS);
if (mqttMsg == null) {
continue;
}
if (mqttMsg == null) {
continue;
}
byte payload[] = mqttMsg.getPayload();
// we acknowledge right after receive because:
// a. none of the stats messages are so important that we cannot skip one
// b. if there's some kind of problem with the message (decoding or processing)
// - we want to move on as quickly as possible and not let it get stuck in the
// queue
mqttMsg.ack();
byte payload[] = mqttMsg.getPayload();
// we acknowledge right after receive because:
// a. none of the stats messages are so important
// that
// we cannot skip one
// b. if there's some kind of problem with the
// message
// (decoding or processing)
// - we want to move on as quickly as possible and
// not
// let it get stuck in the
// queue
mqttMsg.ack();
LOG.trace("received message on topic {} size {}", mqttMsg.getTopic(), payload.length);
LOG.trace("received message on topic {} size {}", mqttMsg.getTopic(), payload.length);
if (payload[0] == 0x78) {
// looks like zlib-compressed data, let's decompress it before deserializing
payload = ZlibUtil.decompress(payload);
}
if (payload[0] == 0x78) {
// looks like zlib-compressed data, let's
// decompress
// it before deserializing
payload = ZlibUtil.decompress(payload);
}
// attempt to parse the message as protobuf
MessageOrBuilder encodedMsg = null;
try {
// attempt to parse the message as protobuf
MessageOrBuilder encodedMsg = null;
try {
encodedMsg = Report.parseFrom(payload);
MQTT_LOG.info("topic = {} Report = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg));
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg);
encodedMsg = Report.parseFrom(payload);
MQTT_LOG.info("topic = {} Report = {}", mqttMsg.getTopic(),
jsonPrinter.print(encodedMsg));
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg);
} catch (Exception e) {
try {
// not a plume_stats report, attempt to deserialize as network_metadata
encodedMsg = FlowReport.parseFrom(payload);
MQTT_LOG.info("topic = {} FlowReport = {}", mqttMsg.getTopic(),
jsonPrinter.print(encodedMsg));
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (FlowReport) encodedMsg);
} catch (Exception e1) {
} catch (Exception e) {
try {
// not a plume_stats report, attempt to
// deserialize as network_metadata
encodedMsg = FlowReport.parseFrom(payload);
MQTT_LOG.info("topic = {} FlowReport = {}", mqttMsg.getTopic(),
jsonPrinter.print(encodedMsg));
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(),
(FlowReport) encodedMsg);
} catch (Exception e1) {
try {
// not a plume_stats report and not network_metadata report, attempt to
// deserialize as WCStatsReport
encodedMsg = WCStatsReport.parseFrom(payload);
MQTT_LOG.info("topic = {} IpDnsTelemetry = {}", mqttMsg.getTopic(),
jsonPrinter.print(encodedMsg));
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(),
(WCStatsReport) encodedMsg);
} catch (Exception e2) {
String msgStr = new String(mqttMsg.getPayload(), utf8);
MQTT_LOG.info("topic = {} message = {}", mqttMsg.getTopic(), msgStr);
}
}
}
try {
// not a plume_stats report and not
// network_metadata report, attempt to
// deserialize as WCStatsReport
encodedMsg = WCStatsReport.parseFrom(payload);
MQTT_LOG.info("topic = {} IpDnsTelemetry = {}", mqttMsg.getTopic(),
jsonPrinter.print(encodedMsg));
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(),
(WCStatsReport) encodedMsg);
} catch (Exception e2) {
String msgStr = new String(mqttMsg.getPayload(), utf8);
MQTT_LOG.info("topic = {} message = {}", mqttMsg.getTopic(), msgStr);
}
}
}
}
}
} catch (Exception e) {
LOG.error("Exception in MQTT receiver", e);
} finally {
try {
if (connection != null) {
connection.disconnect();
}
} catch (Exception e1) {
// do nothing
}
}
}
} catch (Exception e) {
LOG.error("Exception in MQTT receiver", e);
} finally {
try {
if (connection != null) {
connection.disconnect();
}
} catch (Exception e1) {
// do nothing
}
}
}
};
}
};
mqttClientThread = new Thread(mqttClientRunnable, "mqttClientThread");
mqttClientThread.setDaemon(true);
mqttClientThread.start();
mqttClientThread = new Thread(mqttClientRunnable, "mqttClientThread");
mqttClientThread.setDaemon(true);
mqttClientThread.start();
}
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
LOG.debug("Processing ContextClosedEvent event");
keepReconnecting = false;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
LOG.debug("Processing ContextClosedEvent event");
keepReconnecting = false;
if (mqttClientThread != null) {
mqttClientThread.interrupt();
}
}
if (mqttClientThread != null) {
mqttClientThread.interrupt();
}
}
}

View File

@@ -97,7 +97,7 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
// with the serialNumber and using it as a key (equivalent
// of KDC unique qrCode)
String key = clientCn + "_" + connectNodeInfo.serialNumber;
ConnectusOvsdbClient.this.ovsdbSessionMapInterface.newSession(key, ovsdbClient);
ovsdbSessionMapInterface.newSession(key, ovsdbClient);
extIntegrationInterface.apConnected(key, connectNodeInfo);
// push configuration to AP
@@ -105,8 +105,7 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
monitorOvsdbStateTables(ovsdbClient, key);
LOG.info("ovsdbClient connected from {} on port {} key {} ", remoteHost, localPort, key);
LOG.info("ovsdbClient connectedClients = {}",
ConnectusOvsdbClient.this.ovsdbSessionMapInterface.getNumSessions());
LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions());
} catch (Exception e) {
LOG.error("ovsdbClient error", e);
@@ -139,7 +138,7 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
// so we are doing a reverse lookup here, and then if we find
// the key we will
// remove the entry from the connectedClients.
String key = ConnectusOvsdbClient.this.ovsdbSessionMapInterface.lookupClientId(ovsdbClient);
String key = ovsdbSessionMapInterface.lookupClientId(ovsdbClient);
if (key != null) {
try {
@@ -155,7 +154,7 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
}
try {
extIntegrationInterface.apDisconnected(key);
ConnectusOvsdbClient.this.ovsdbSessionMapInterface.removeSession(key);
ovsdbSessionMapInterface.removeSession(key);
} catch (Exception e) {
LOG.debug("Unable to process ap disconnect. {}", e.getMessage());
} finally {
@@ -165,8 +164,7 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
LOG.info("ovsdbClient disconnected from {} on port {} clientCn {} key {} ", remoteHost, localPort,
clientCn, key);
LOG.info("ovsdbClient connectedClients = {}",
ConnectusOvsdbClient.this.ovsdbSessionMapInterface.getNumSessions());
LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions());
}
};
@@ -182,8 +180,8 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
LOG.debug("Starting Client connect");
connectNodeInfo = ovsdbDao.updateConnectNodeInfoOnConnect(ovsdbClient, clientCn, connectNodeInfo);
String apId = clientCn + "_" + connectNodeInfo.serialNumber;
OpensyncAPConfig opensyncAPConfig = extIntegrationInterface.getApConfig(apId);
ovsdbDao.vifBridge = connectNodeInfo.ifName;
try {
ovsdbDao.provisionBridgePortInterface(ovsdbClient);
@@ -193,10 +191,12 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
}
ovsdbDao.removeAllSsids(ovsdbClient); // always
String apId = clientCn + "_" + connectNodeInfo.serialNumber;
OpensyncAPConfig opensyncAPConfig = extIntegrationInterface.getApConfig(apId);
if (opensyncAPConfig != null) {
ovsdbDao.configureWifiRadios(ovsdbClient, opensyncAPConfig);
ovsdbDao.configureSsids(ovsdbClient, opensyncAPConfig);
ovsdbDao.configureWifiInet(ovsdbClient, opensyncAPConfig);
}
ovsdbDao.configureStats(ovsdbClient);
@@ -208,8 +208,6 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
ovsdbDao.updateDeviceStatsReportingInterval(ovsdbClient, collectionIntervalSecDeviceStats);
}
// ovsdbDao.configureWifiInet(ovsdbClient);
LOG.debug("Client connect Done");
return connectNodeInfo;
}
@@ -307,22 +305,22 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
for (Entry<UUID, RowUpdate> rowUpdate : tableUpdate.getValue().getRowUpdates()
.entrySet()) {
if (rowUpdate.getValue().getOld() != null
&& rowUpdate.getValue().getNew() == null) {
if ((rowUpdate.getValue().getOld() != null)
&& (rowUpdate.getValue().getNew() == null)) {
Row row = rowUpdate.getValue().getOld();
String ifName = null;
String ssid = null;
if (row.getColumns().get("ssid") != null
if ((row.getColumns().get("ssid") != null)
&& row.getColumns().get("ssid").getClass().equals(
com.vmware.ovsdb.protocol.operation.notation.Atom.class)) {
ssid = row.getStringColumn("ssid");
}
if (row.getColumns().get("if_name") != null
if ((row.getColumns().get("if_name") != null)
&& row.getColumns().get("if_name").getClass().equals(
com.vmware.ovsdb.protocol.operation.notation.Atom.class)) {
ifName = row.getStringColumn("if_name");
}
if (ifName != null && ssid != null) {
if ((ifName != null) && (ssid != null)) {
OpensyncAPVIFState toBeDeleted = new OpensyncAPVIFState();
toBeDeleted.setSsid(ssid);
toBeDeleted.setIfName(ifName);
@@ -333,7 +331,7 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
}
if (tableUpdate.getValue().getRowUpdates().values().isEmpty()) {
if (tableUpdate.getValue().getRowUpdates().isEmpty()) {
tableUpdates.getTableUpdates().remove(tableUpdate.getKey());
}
@@ -378,7 +376,7 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
for (TableUpdate tableUpdate : tableUpdates.getTableUpdates().values()) {
for (RowUpdate rowUpdate : tableUpdate.getRowUpdates().values()) {
if (rowUpdate.getOld() != null && rowUpdate.getNew() == null) {
if ((rowUpdate.getOld() != null) && (rowUpdate.getNew() == null)) {
Row row = rowUpdate.getOld();
String deletedClientMac = row.getStringColumn("mac");
extIntegrationInterface.wifiAssociatedClientsDbTableDelete(deletedClientMac,