TW-208: MonitorCallback cleanup

This commit is contained in:
Mike Hansen
2020-04-03 13:32:56 -04:00
parent 74e5d4dde8
commit 5bd6841fef
5 changed files with 140 additions and 230 deletions

View File

@@ -155,8 +155,8 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
//attempt to parse the message as protobuf
MessageOrBuilder encodedMsg = null;
try {
encodedMsg = Report.parseFrom(payload);
MQTT_LOG.info("topic = {} Report = {}", mqttMsg.getTopic(), jsonPrinter.print(encodedMsg));
extIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), (Report) encodedMsg);

View File

@@ -1,8 +1,8 @@
package com.telecominfraproject.wlan.opensync.ovsdb;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
@@ -14,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import com.google.common.collect.ImmutableMap;
import com.telecominfraproject.wlan.opensync.external.integration.ConnectusOvsdbClientInterface;
import com.telecominfraproject.wlan.opensync.external.integration.OpensyncExternalIntegrationInterface;
import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSession;
@@ -24,9 +25,12 @@ import com.telecominfraproject.wlan.opensync.ovsdb.dao.OvsdbDao;
import com.telecominfraproject.wlan.opensync.util.SslUtil;
import com.vmware.ovsdb.callback.ConnectionCallback;
import com.vmware.ovsdb.callback.MonitorCallback;
import com.vmware.ovsdb.exception.OvsdbClientException;
import com.vmware.ovsdb.protocol.methods.MonitorRequest;
import com.vmware.ovsdb.protocol.methods.MonitorRequests;
import com.vmware.ovsdb.protocol.methods.MonitorSelect;
import com.vmware.ovsdb.protocol.methods.RowUpdate;
import com.vmware.ovsdb.protocol.methods.TableUpdates;
import com.vmware.ovsdb.protocol.operation.notation.Row;
import com.vmware.ovsdb.service.OvsdbClient;
import com.vmware.ovsdb.service.OvsdbPassiveConnectionListener;
@@ -36,6 +40,18 @@ import io.netty.handler.ssl.SslContext;
@Component
public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
private static final String[] AWLAN_NODE_DB_TABLE_COLUMNS = new String[] { "device_mode", "id", "model",
"serial_number", "firmware_version", "platform_version", "redirector_addr" };
private static final String[] WIFI_VIF_STATE_DB_TABLE_COLUMNS = new String[] { "min_hw_mode", "if_name", "security",
"bridge", "channel", "enabled", "ssid_broadcast", "mac", "ssid" };
private static final String[] WIFI_INET_STATE_DB_TABLE_COLUMNS = new String[] { "hwaddr", "if_type", "if_name",
"inet_addr", "dhcpc", "network" };
private static final String[] WIFI_RADIO_STATE_DB_TABLE_COLUMNS = new String[] { "ht_mode", "hw_mode", "hw_params",
"hw_type", "mac", "if_name", "freq_band", "country", "channel" };
private static final Logger LOG = LoggerFactory.getLogger(ConnectusOvsdbClient.class);
@org.springframework.beans.factory.annotation.Value("${connectus.ovsdb.listenPort:6640}")
@@ -66,73 +82,6 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
public void listenForConnections() {
class ConnectusMonitorCallback implements MonitorCallback {
private String connectedClientId;
public ConnectusMonitorCallback(String clientId) {
this.connectedClientId = clientId;
}
@Override
public void update(TableUpdates tableUpdates) {
Set<String> tableNames = tableUpdates.getTableUpdates().keySet();
for (String name : tableNames) {
if (name.equals(OvsdbDao.wifiAssociatedClientsDbTable)) {
Map<UUID, RowUpdate> updates = tableUpdates.getTableUpdates().get(name).getRowUpdates();
for (UUID id : updates.keySet()) {
Row row = updates.get(id).getNew();
if (row != null)
extIntegrationInterface.handleClientsChanged(row.getColumns(), connectedClientId);
}
} else if (name.equals(OvsdbDao.awlanNodeDbTable)) {
Map<UUID, RowUpdate> updates = tableUpdates.getTableUpdates().get(name).getRowUpdates();
for (UUID id : updates.keySet()) {
Row row = updates.get(id).getNew();
if (row != null)
extIntegrationInterface.awlanChanged(row.getColumns(), connectedClientId);
}
} else if (name.equals(OvsdbDao.wifiVifStateDbTable)) {
Map<UUID, RowUpdate> updates = tableUpdates.getTableUpdates().get(name).getRowUpdates();
for (UUID id : updates.keySet()) {
Row row = updates.get(id).getNew();
if (row != null)
extIntegrationInterface.wirelessStatusChanged(row.getColumns(), connectedClientId);
}
} else if (name.equals(OvsdbDao.wifiInetStateDbTable)) {
Map<UUID, RowUpdate> updates = tableUpdates.getTableUpdates().get(name).getRowUpdates();
for (UUID id : updates.keySet()) {
Row row = updates.get(id).getNew();
if (row != null)
extIntegrationInterface.networkStatusChanged(row.getColumns(), connectedClientId);
}
} else if (name.equals(OvsdbDao.wifiRadioStateDbTable)) {
Map<UUID, RowUpdate> updates = tableUpdates.getTableUpdates().get(name).getRowUpdates();
for (UUID id : updates.keySet()) {
Row row = updates.get(id).getNew();
if (row != null)
extIntegrationInterface.deviceStatusChanged(row.getColumns(), connectedClientId);
}
}
}
}
}
ConnectionCallback connectionCallback = new ConnectionCallback() {
public void connected(OvsdbClient ovsdbClient) {
String remoteHost = ovsdbClient.getConnectionInfo().getRemoteAddress().getHostAddress();
@@ -159,20 +108,100 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
// push configuration to AP
connectNodeInfo = processConnectRequest(ovsdbClient, clientCn, connectNodeInfo);
LOG.info("ovsdbClient connected from {} on port {} key {} ", remoteHost, localPort, key);
LOG.info("ovsdbClient connectedClients = {}",
ConnectusOvsdbClient.this.ovsdbSessionMapInterface.getNumSessions());
// monitor radio config state
ovsdbDao.monitorRadioConfigState(ovsdbClient, new ConnectusMonitorCallback(key));
// monitor inet state
ovsdbDao.monitorInetState(ovsdbClient, new ConnectusMonitorCallback(key));
// monitor vif state
ovsdbDao.monitorVIFState(ovsdbClient, new ConnectusMonitorCallback(key));
// monitor AWLAN_Node
ovsdbDao.monitorAwlanNode(ovsdbClient, new ConnectusMonitorCallback(key));
// monitor Wifi_Associated_Clients
ovsdbDao.monitorAssociatedClients(ovsdbClient, new ConnectusMonitorCallback(key));
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiRadioStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiRadioStateDbTable,
new MonitorRequest(Arrays.asList(WIFI_RADIO_STATE_DB_TABLE_COLUMNS),
new MonitorSelect(true, false, false, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
extIntegrationInterface.wifiRadioStatusDbTableUpdate(
rowUpdates.get(uuid).getNew().getColumns(), key);
}
});
}
});
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiInetStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiInetStateDbTable,
new MonitorRequest(Arrays.asList(WIFI_INET_STATE_DB_TABLE_COLUMNS),
new MonitorSelect(true, true, true, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
extIntegrationInterface.wifiInetStateDbTableUpdate(
rowUpdates.get(uuid).getNew().getColumns(), key);
}
});
}
});
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiVifStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiVifStateDbTable,
new MonitorRequest(Arrays.asList(WIFI_VIF_STATE_DB_TABLE_COLUMNS),
new MonitorSelect(true, false, false, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
extIntegrationInterface.wifiVIFStateDbTableUpdate(
rowUpdates.get(uuid).getNew().getColumns(), key);
}
});
}
});
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.awlanNodeDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.awlanNodeDbTable,
new MonitorRequest(Arrays.asList(AWLAN_NODE_DB_TABLE_COLUMNS),
new MonitorSelect(true, false, false, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
extIntegrationInterface.awlanChanged(
rowUpdates.get(uuid).getNew().getColumns(), key);
}
});
}
});
} catch (Exception e) {
LOG.error("ovsdbClient error", e);
@@ -211,7 +240,14 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
ConnectusOvsdbClient.this.ovsdbSessionMapInterface.removeSession(key);
}
// turn off monitor
ovsdbDao.cancelMonitors(ovsdbClient);
try {
ovsdbClient.cancelMonitor(OvsdbDao.wifiRadioStateDbTable);
ovsdbClient.cancelMonitor(OvsdbDao.wifiVifStateDbTable);
ovsdbClient.cancelMonitor(OvsdbDao.wifiInetStateDbTable);
ovsdbClient.cancelMonitor(OvsdbDao.awlanNodeDbTable);
} catch (OvsdbClientException e) {
LOG.warn("Could not cancel Monitor {}", e);
}
ovsdbClient.shutdown();
LOG.info("ovsdbClient disconnected from {} on port {} clientCn {} key {} ", remoteHost, localPort,

View File

@@ -86,131 +86,6 @@ public class OvsdbDao {
public static final String wifiAssociatedClientsDbTable = "Wifi_Associated_Clients";
private static final String wifiRadioStateDbTableMonitorId = "Wifi_Radio_State_Monitor_Id";
private static final String wifiInetStateDbTableMonitorId = "Wifi_Inet_State_Monitor_Id";
private static final String wifiVifStateDbTableMonitorId = "Wifi_VIF_State_Monitor_Id";
private static final String wifiAwlanNodeDbTableMonitorId = "AWLAN_Node_Monitor_Id";
private static final String wifiAssociatedClientsDbTableMonitorId = "Wifi_Associated_Clients_Monitor_Id";
public void monitorAwlanNode(OvsdbClient ovsdbClient, MonitorCallback monitorCallback) {
MonitorRequest monitorRequest = new MonitorRequest();
MonitorRequests monitorRequests = new MonitorRequests(ImmutableMap.of(awlanNodeDbTable, monitorRequest));
try {
ovsdbClient.monitor(ovsdbName, wifiAwlanNodeDbTableMonitorId, monitorRequests, monitorCallback);
} catch (OvsdbClientException e) {
LOG.error("Unable to add Monitor to table " + awlanNodeDbTable, e);
}
}
public void monitorAssociatedClients(OvsdbClient ovsdbClient, MonitorCallback monitorCallback) {
MonitorRequest monitorRequest = new MonitorRequest();
MonitorRequests monitorRequests = new MonitorRequests(
ImmutableMap.of(wifiAssociatedClientsDbTable, monitorRequest));
try {
ovsdbClient.monitor(ovsdbName, wifiAssociatedClientsDbTableMonitorId, monitorRequests, monitorCallback);
} catch (OvsdbClientException e) {
LOG.error("Unable to add Monitor to table " + wifiAssociatedClientsDbTable, e);
}
}
public void monitorVIFState(OvsdbClient ovsdbClient, MonitorCallback monitorCallback) {
MonitorRequest monitorRequest = new MonitorRequest();
MonitorRequests monitorRequests = new MonitorRequests(ImmutableMap.of(wifiVifStateDbTable, monitorRequest));
try {
ovsdbClient.monitor(ovsdbName, wifiVifStateDbTableMonitorId, monitorRequests, monitorCallback);
} catch (OvsdbClientException e) {
LOG.error("Unable to add Monitor to table " + wifiVifStateDbTable, e);
}
}
public void monitorInetState(OvsdbClient ovsdbClient, MonitorCallback monitorCallback) {
List<String> columns = new ArrayList<>();
columns.add("_uuid");
columns.add("_version");
columns.add("broadcast");
columns.add("dhcpc");
columns.add("enabled");
columns.add("hwaddr");
columns.add("if_name");
columns.add("if_type");
columns.add("if_uuid");
columns.add("inet_addr");
columns.add("ip_assign_scheme");
columns.add("mtu");
columns.add("netmask");
columns.add("network");
MonitorRequest monitorRequest = new MonitorRequest();
MonitorRequests monitorRequests = new MonitorRequests(ImmutableMap.of(wifiInetStateDbTable, monitorRequest));
try {
ovsdbClient.monitor(ovsdbName, wifiInetStateDbTableMonitorId, monitorRequests, monitorCallback);
} catch (OvsdbClientException e) {
LOG.error("Unable to add Monitor to table " + wifiInetStateDbTable, e);
}
}
//
// Note: When talking to OVSDB always use future.get(X, TimeUnit.SECONDS); -
// to prevent DOS attacks with misbehaving clients
//
public void cancelMonitors(OvsdbClient ovsdbClient) {
try {
ovsdbClient.cancelMonitor(wifiRadioStateDbTableMonitorId);
ovsdbClient.cancelMonitor(wifiVifStateDbTableMonitorId);
ovsdbClient.cancelMonitor(wifiInetStateDbTableMonitorId);
ovsdbClient.cancelMonitor(wifiAssociatedClientsDbTableMonitorId);
ovsdbClient.cancelMonitor(wifiAwlanNodeDbTableMonitorId);
} catch (OvsdbClientException e) {
LOG.debug("Could not cancel Monitor. {}", e.getLocalizedMessage());
}
}
public void monitorRadioConfigState(OvsdbClient ovsdbClient, MonitorCallback monitorCallback) {
List<String> columns = new ArrayList<>();
columns.add("_uuid");
columns.add("_version");
columns.add("allowed_channels");
columns.add("bcn_int");
columns.add("channel");
columns.add("channel_mode");
columns.add("channels");
columns.add("country");
columns.add("enabled");
columns.add("freq_band");
columns.add("ht_mode");
columns.add("hw_config");
columns.add("hw_mode");
columns.add("hw_params");
columns.add("if_name");
columns.add("mac");
columns.add("radio_config");
columns.add("tx_chainmask");
columns.add("tx_power");
columns.add("vif_states");
MonitorRequest monitorRequest = new MonitorRequest(columns, new MonitorSelect(true, true, true, true));
Map<String, MonitorRequest> monitorRequests = new HashMap<String, MonitorRequest>();
monitorRequests.put(wifiRadioStateDbTable, monitorRequest);
MonitorRequests requests = new MonitorRequests(monitorRequests);
try {
ovsdbClient.monitor(ovsdbName, wifiRadioStateDbTableMonitorId, requests, monitorCallback);
} catch (OvsdbClientException e) {
LOG.error("Unable to add Monitor to table " + wifiRadioStateDbTable, e);
}
}
public ConnectNodeInfo getConnectNodeInfo(OvsdbClient ovsdbClient) {
ConnectNodeInfo ret = new ConnectNodeInfo();
@@ -1719,23 +1594,22 @@ public class OvsdbDao {
//
}
for (String band : new String[] { "2.4G", "5GL", "5G" }) {
updateColumns = new HashMap<>();
updateColumns.put("radio_type", new Atom<>(band));
updateColumns.put("reporting_interval", new Atom<>(120));
updateColumns.put("sampling_interval", new Atom<>(10));
updateColumns.put("report_type", new Atom<>("average"));
updateColumns.put("stats_type", new Atom<>("rssi"));
updateColumns.put("survey_interval_ms", new Atom<>(0));
updateColumns.put("survey_type", new Atom<>("on-chan"));
row = new Row(updateColumns);
if (provisionedWifiStatsConfigs.containsKey(band + "_rssi_onChannel")) {
operations.add(new Update(wifiStatsConfigDbTable, row));
} else {
operations.add(new Insert(wifiStatsConfigDbTable, row));
}
}
// for (String band : new String[] { "2.4G", "5GL", "5G" }) {
// if (!provisionedWifiStatsConfigs.containsKey(band + "_rssi_onChannel")) {
// updateColumns = new HashMap<>();
// updateColumns.put("radio_type", new Atom<>(band));
// updateColumns.put("reporting_interval", new Atom<>(120));
// updateColumns.put("sampling_interval", new Atom<>(10));
// updateColumns.put("report_type", new Atom<>("average"));
// updateColumns.put("stats_type", new Atom<>("rssi"));
// updateColumns.put("survey_interval_ms", new Atom<>(0));
// updateColumns.put("survey_type", new Atom<>("on-chan"));
// row = new Row(updateColumns);
//
// operations.add(new Update(wifiStatsConfigDbTable, row));
// }
//
// }
if (!operations.isEmpty()) {
CompletableFuture<OperationResult[]> fResult = ovsdbClient.transact(ovsdbName, operations);