Improve TableUpdates handling from monitor callback.

This commit is contained in:
Mike Hansen
2020-04-09 11:30:59 -04:00
parent 0f0286bfcb
commit b127294205
3 changed files with 92 additions and 398 deletions

View File

@@ -1,9 +1,8 @@
package com.telecominfraproject.wlan.opensync.external.integration;
import java.util.Map;
import com.telecominfraproject.wlan.opensync.external.integration.models.ConnectNodeInfo;
import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncAPConfig;
import com.vmware.ovsdb.protocol.methods.TableUpdates;
import sts.PlumeStats.Report;
import traffic.NetworkMetadata.FlowReport;
@@ -20,11 +19,11 @@ public interface OpensyncExternalIntegrationInterface {
OpensyncAPConfig getApConfig(String apId);
void wifiVIFStateDbTableUpdate(Map<String, String> row, String apId, RowUpdateOperation operation);
void wifiVIFStateDbTableUpdate(TableUpdates tableUpdates, String apId);
void wifiRadioStatusDbTableUpdate(Map<String, String> row, String apId, RowUpdateOperation operation);
void wifiRadioStatusDbTableUpdate(TableUpdates tableUpdates, String apId);
void wifiInetStateDbTableUpdate(Map<String, String> row, String apId, RowUpdateOperation operation);
void wifiInetStateDbTableUpdate(TableUpdates tableUpdates, String apId);
void processMqttMessage(String topic, Report report);
@@ -32,7 +31,7 @@ public interface OpensyncExternalIntegrationInterface {
void processMqttMessage(String topic, WCStatsReport wcStatsReport);
void wifiAssociatedClientsDbTableUpdate(Map<String, String> row, String apId, RowUpdateOperation operation);
void wifiAssociatedClientsDbTableUpdate(TableUpdates tableUpdates, String apId);
void awlan_NodeDbTableUpdate(Map<String, String> row, String connectedClientId, RowUpdateOperation operation);
void awlanNodeDbTableUpdate(TableUpdates tableUpdates, String connectedClientId);
}

View File

@@ -1,8 +1,6 @@
package com.telecominfraproject.wlan.opensync.external.integration;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import javax.annotation.PostConstruct;
@@ -14,7 +12,7 @@ import org.springframework.stereotype.Component;
import com.telecominfraproject.wlan.opensync.external.integration.models.ConnectNodeInfo;
import com.telecominfraproject.wlan.opensync.external.integration.models.OpensyncAPConfig;
import com.vmware.ovsdb.protocol.operation.notation.Uuid;
import com.vmware.ovsdb.protocol.methods.TableUpdates;
import sts.PlumeStats.Report;
import traffic.NetworkMetadata.FlowReport;
@@ -97,34 +95,33 @@ public class OpensyncExternalIntegrationSimple implements OpensyncExternalIntegr
}
@Override
public void wifiVIFStateDbTableUpdate(Map<String, String> row,
String apId, RowUpdateOperation operation) {
public void wifiVIFStateDbTableUpdate(TableUpdates tableUpdates,
String apId) {
// TODO Auto-generated method stub
}
@Override
public void wifiRadioStatusDbTableUpdate(Map<String, String> row, String apId, RowUpdateOperation operation) {
public void wifiRadioStatusDbTableUpdate(TableUpdates tableUpdates, String apId) {
// TODO Auto-generated method stub
}
@Override
public void wifiInetStateDbTableUpdate(Map<String, String> row, String apId, RowUpdateOperation operation) {
public void wifiInetStateDbTableUpdate(TableUpdates tableUpdates, String apId) {
// TODO Auto-generated method stub
}
@Override
public void wifiAssociatedClientsDbTableUpdate(Map<String, String> row,
String apId, RowUpdateOperation operation) {
public void wifiAssociatedClientsDbTableUpdate(TableUpdates tableUpdates,
String apId) {
// TODO Auto-generated method stub
}
@Override
public void awlan_NodeDbTableUpdate(Map<String, String> row, String connectedClientId,
RowUpdateOperation operation) {
public void awlanNodeDbTableUpdate(TableUpdates tableUpdates, String connectedClientId) {
// TODO Auto-generated method stub
}

View File

@@ -1,11 +1,8 @@
package com.telecominfraproject.wlan.opensync.ovsdb;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.PostConstruct;
@@ -30,9 +27,7 @@ import com.vmware.ovsdb.exception.OvsdbClientException;
import com.vmware.ovsdb.protocol.methods.MonitorRequest;
import com.vmware.ovsdb.protocol.methods.MonitorRequests;
import com.vmware.ovsdb.protocol.methods.MonitorSelect;
import com.vmware.ovsdb.protocol.methods.RowUpdate;
import com.vmware.ovsdb.protocol.methods.TableUpdates;
import com.vmware.ovsdb.protocol.operation.notation.Row;
import com.vmware.ovsdb.service.OvsdbClient;
import com.vmware.ovsdb.service.OvsdbPassiveConnectionListener;
@@ -103,379 +98,7 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
LOG.info("ovsdbClient connectedClients = {}",
ConnectusOvsdbClient.this.ovsdbSessionMapInterface.getNumSessions());
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiRadioStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiRadioStateDbTable,
new MonitorRequest(Arrays.asList(WIFI_RADIO_STATE_DB_TABLE_COLUMNS),
new MonitorSelect(true, true, true, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
// initial, insert, modify,
// null if delete
Row newRow = rowUpdates.get(uuid).getNew();
// delete, modify, null if
// init or insert
Row oldRow = rowUpdates.get(uuid).getOld();
Map<String, String> rowContents = new HashMap<String, String>();
if (oldRow == null) {
Set<Long> channel = newRow.getSetColumn("channel");
if (!channel.isEmpty()) {
rowContents.put("channel",
channel.stream().iterator().next().toString());
} else {
rowContents.put("channel", "");
}
rowContents.put("ht_mode",
ovsdbDao.getSingleValueFromSet(newRow, "ht_mode"));
rowContents.put("hw_mode", newRow.getStringColumn("hw_mode"));
rowContents.put("hw_type", newRow.getStringColumn("hw_type"));
rowContents.put("hw_params", newRow.getStringColumn("hw_type"));
rowContents.put("mac", newRow.getStringColumn("mac"));
rowContents.put("if_name", newRow.getStringColumn("if_name"));
rowContents.put("freq_band",
ovsdbDao.getSingleValueFromSet(newRow, "freq_band"));
rowContents.put("country",
newRow.getSetColumn("country").toString());
extIntegrationInterface.wifiRadioStatusDbTableUpdate(
rowContents, key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.INSERT);
} else if (newRow == null) {
Set<Long> channel = oldRow.getSetColumn("channel");
if (!channel.isEmpty()) {
rowContents.put("channel",
channel.stream().iterator().next().toString());
} else {
rowContents.put("channel", "");
}
rowContents.put("ht_mode",
ovsdbDao.getSingleValueFromSet(oldRow, "ht_mode"));
rowContents.put("hw_mode", oldRow.getStringColumn("hw_mode"));
rowContents.put("hw_type", oldRow.getStringColumn("hw_type"));
rowContents.put("hw_params", oldRow.getStringColumn("hw_type"));
rowContents.put("mac", oldRow.getStringColumn("mac"));
rowContents.put("if_name", oldRow.getStringColumn("if_name"));
rowContents.put("freq_band",
ovsdbDao.getSingleValueFromSet(oldRow, "freq_band"));
rowContents.put("country",
oldRow.getSetColumn("country").toString());
extIntegrationInterface.wifiRadioStatusDbTableUpdate(
rowContents, key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.DELETE);
} else if (oldRow != null && newRow.getColumns().keySet()
.containsAll(oldRow.getColumns().keySet())) {
Set<Long> channel = newRow.getSetColumn("channel");
if (!channel.isEmpty()) {
rowContents.put("channel",
channel.stream().iterator().next().toString());
} else {
rowContents.put("channel", "");
}
rowContents.put("ht_mode",
ovsdbDao.getSingleValueFromSet(newRow, "ht_mode"));
rowContents.put("hw_mode", newRow.getStringColumn("hw_mode"));
rowContents.put("hw_type", newRow.getStringColumn("hw_type"));
rowContents.put("hw_params", newRow.getStringColumn("hw_type"));
rowContents.put("mac", newRow.getStringColumn("mac"));
rowContents.put("if_name", newRow.getStringColumn("if_name"));
rowContents.put("freq_band",
ovsdbDao.getSingleValueFromSet(newRow, "freq_band"));
rowContents.put("country",
newRow.getSetColumn("country").toString());
extIntegrationInterface.wifiRadioStatusDbTableUpdate(
rowContents, key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.MODIFY);
}
}
});
}
});
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiInetStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiInetStateDbTable,
new MonitorRequest(Arrays.asList(WIFI_INET_STATE_DB_TABLE_COLUMNS),
new MonitorSelect(true, true, true, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
// initial, insert, modify,
// null if delete
Row newRow = rowUpdates.get(uuid).getNew();
// delete, modify, null if
// init or insert
Row oldRow = rowUpdates.get(uuid).getOld();
Map<String, String> rowContents = new HashMap<String, String>();
if (oldRow == null) {
rowContents.put("if_type",
ovsdbDao.getSingleValueFromSet(newRow, "if_type"));
rowContents.put("hwaddr", newRow.getStringColumn("hwaddr"));
rowContents.put("if_name", newRow.getStringColumn("if_name"));
rowContents.put("inet_addr",
newRow.getStringColumn("inet_addr"));
rowContents.put("dhcpc",
newRow.getMapColumn("dhcpc").toString());
rowContents.put("network",
newRow.getBooleanColumn("network").toString());
extIntegrationInterface.wifiInetStateDbTableUpdate(rowContents,
key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.INSERT);
} else if (newRow == null) {
rowContents.put("if_type",
ovsdbDao.getSingleValueFromSet(oldRow, "if_type"));
rowContents.put("hwaddr", oldRow.getStringColumn("hwaddr"));
rowContents.put("if_name", oldRow.getStringColumn("if_name"));
rowContents.put("inet_addr",
oldRow.getStringColumn("inet_addr"));
rowContents.put("dhcpc",
oldRow.getMapColumn("dhcpc").toString());
rowContents.put("network",
oldRow.getBooleanColumn("network").toString());
extIntegrationInterface.wifiInetStateDbTableUpdate(rowContents,
key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.DELETE);
} else if (newRow.getColumns().keySet()
.containsAll(oldRow.getColumns().keySet())) {
rowContents.put("if_type",
ovsdbDao.getSingleValueFromSet(newRow, "if_type"));
rowContents.put("hwaddr", newRow.getStringColumn("hwaddr"));
rowContents.put("if_name", newRow.getStringColumn("if_name"));
rowContents.put("inet_addr",
newRow.getStringColumn("inet_addr"));
rowContents.put("dhcpc",
newRow.getMapColumn("dhcpc").toString());
rowContents.put("network",
newRow.getBooleanColumn("network").toString());
extIntegrationInterface.wifiInetStateDbTableUpdate(rowContents,
key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.MODIFY);
}
}
});
}
});
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiVifStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiVifStateDbTable,
new MonitorRequest(Arrays.asList(WIFI_VIF_STATE_DB_TABLE_COLUMNS),
new MonitorSelect(true, true, true, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
// initial, insert, modify,
// null if delete
Row newRow = rowUpdates.get(uuid).getNew();
// delete, modify, null if
// init or insert
Row oldRow = rowUpdates.get(uuid).getOld();
Map<String, String> rowContents = new HashMap<String, String>();
if (oldRow == null) {
rowContents.put("min_hw_mode",
ovsdbDao.getSingleValueFromSet(newRow, "min_hw_mode"));
rowContents.put("if_name", newRow.getStringColumn("if_name"));
rowContents.put("security",
newRow.getMapColumn("security").toString());
rowContents.put("bridge",
newRow.getSetColumn("bridge").toString());
Set<Long> channel = newRow.getSetColumn("channel");
if (!channel.isEmpty()) {
rowContents.put("channel",
channel.stream().iterator().next().toString());
} else {
rowContents.put("channel", "");
}
rowContents.put("enabled",
newRow.getBooleanColumn("enabled").toString());
rowContents.put("ssid_broadcast", ovsdbDao
.getSingleValueFromSet(newRow, "ssid_broadcast"));
rowContents.put("mac", newRow.getStringColumn("mac"));
rowContents.put("ssid", newRow.getStringColumn("ssid"));
extIntegrationInterface.wifiVIFStateDbTableUpdate(rowContents,
key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.INSERT);
} else if (newRow == null) {
rowContents.put("min_hw_mode",
ovsdbDao.getSingleValueFromSet(oldRow, "min_hw_mode"));
rowContents.put("if_name", oldRow.getStringColumn("if_name"));
rowContents.put("security",
oldRow.getMapColumn("security").toString());
rowContents.put("bridge",
oldRow.getSetColumn("bridge").toString());
Set<Long> channel = oldRow.getSetColumn("channel");
if (!channel.isEmpty()) {
rowContents.put("channel",
channel.stream().iterator().next().toString());
} else {
rowContents.put("channel", "");
}
rowContents.put("enabled",
oldRow.getBooleanColumn("enabled").toString());
rowContents.put("ssid_broadcast", ovsdbDao
.getSingleValueFromSet(oldRow, "ssid_broadcast"));
rowContents.put("mac", oldRow.getStringColumn("mac"));
rowContents.put("ssid", oldRow.getStringColumn("ssid"));
extIntegrationInterface.wifiVIFStateDbTableUpdate(rowContents,
key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.DELETE);
} else if (newRow.getColumns().keySet()
.containsAll(oldRow.getColumns().keySet())) {
rowContents.put("min_hw_mode",
ovsdbDao.getSingleValueFromSet(newRow, "min_hw_mode"));
rowContents.put("if_name", newRow.getStringColumn("if_name"));
rowContents.put("security",
newRow.getMapColumn("security").toString());
rowContents.put("bridge",
newRow.getSetColumn("bridge").toString());
Set<Long> channel = newRow.getSetColumn("channel");
if (!channel.isEmpty()) {
rowContents.put("channel",
channel.stream().iterator().next().toString());
} else {
rowContents.put("channel", "");
}
rowContents.put("enabled",
newRow.getBooleanColumn("enabled").toString());
rowContents.put("ssid_broadcast", ovsdbDao
.getSingleValueFromSet(newRow, "ssid_broadcast"));
rowContents.put("mac", newRow.getStringColumn("mac"));
rowContents.put("ssid", newRow.getStringColumn("ssid"));
extIntegrationInterface.wifiVIFStateDbTableUpdate(rowContents,
key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.MODIFY);
}
}
});
}
});
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.awlanNodeDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.awlanNodeDbTable,
new MonitorRequest(Arrays.asList(AWLAN_NODE_DB_TABLE_COLUMNS),
new MonitorSelect(true, false, false, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
// initial, insert, modify,
// null if delete
Row newRow = rowUpdates.get(uuid).getNew();
// delete, modify, null if
// init or insert
Row oldRow = rowUpdates.get(uuid).getOld();
Map<String, String> rowContents = new HashMap<String, String>();
if (oldRow != null && newRow.getColumns().keySet()
.containsAll(oldRow.getColumns().keySet())) {
rowContents.put("device_mode",
ovsdbDao.getSingleValueFromSet(newRow, "device_mode"));
rowContents.put("id", newRow.getStringColumn("id"));
rowContents.put("model", newRow.getStringColumn("model"));
rowContents.put("serial_number",
newRow.getStringColumn("serial_number"));
rowContents.put("firmware_version",
newRow.getStringColumn("firmware_version"));
rowContents.put("platform_version",
newRow.getStringColumn("platform_version"));
rowContents.put("redirector_addr",
newRow.getStringColumn("redirector_addr"));
extIntegrationInterface.awlan_NodeDbTableUpdate(rowContents,
key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.MODIFY);
}
}
});
}
});
ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiAssociatedClientsDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiAssociatedClientsDbTable,
new MonitorRequest(Arrays.asList(WIFI_ASSOCIATED_CLIENTS_DB_TABLE_COLUMNS),
new MonitorSelect(true, true, true, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
tableUpdates.getTableUpdates().entrySet().stream().forEach(e -> {
Map<UUID, RowUpdate> rowUpdates = e.getValue().getRowUpdates();
for (UUID uuid : rowUpdates.keySet()) {
// initial, insert, modify,
// null if delete
Row newRow = rowUpdates.get(uuid).getNew();
// delete, modify, null if
// init or insert
Row oldRow = rowUpdates.get(uuid).getOld();
Map<String, String> rowContents = new HashMap<String, String>();
if (oldRow == null) {
rowContents.put("mac", newRow.getStringColumn("mac"));
extIntegrationInterface.wifiAssociatedClientsDbTableUpdate(
rowContents, key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.INSERT);
} else if (newRow == null) {
rowContents.put("mac", oldRow.getStringColumn("mac"));
extIntegrationInterface.wifiAssociatedClientsDbTableUpdate(
rowContents, key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.DELETE);
} else if (oldRow != null && newRow.getColumns().keySet()
.containsAll(oldRow.getColumns().keySet())) {
rowContents.put("mac", newRow.getStringColumn("mac"));
extIntegrationInterface.wifiAssociatedClientsDbTableUpdate(
rowContents, key,
OpensyncExternalIntegrationInterface.RowUpdateOperation.MODIFY);
}
}
});
}
});
monitorOvsdbStateTables(ovsdbClient, key);
} catch (Exception e) {
LOG.error("ovsdbClient error", e);
@@ -518,7 +141,6 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
ovsdbClient.cancelMonitor(OvsdbDao.wifiRadioStateDbTable);
ovsdbClient.cancelMonitor(OvsdbDao.wifiVifStateDbTable);
ovsdbClient.cancelMonitor(OvsdbDao.wifiInetStateDbTable);
ovsdbClient.cancelMonitor(OvsdbDao.awlanNodeDbTable);
ovsdbClient.cancelMonitor(OvsdbDao.wifiAssociatedClientsDbTable);
} catch (OvsdbClientException e) {
LOG.warn("Could not cancel Monitor {}", e);
@@ -611,4 +233,80 @@ public class ConnectusOvsdbClient implements ConnectusOvsdbClientInterface {
LOG.debug("Finished processConfigChanged for {}", apId);
}
private void monitorOvsdbStateTables(OvsdbClient ovsdbClient, String key) throws OvsdbClientException {
CompletableFuture<TableUpdates> rsCf = ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiRadioStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiRadioStateDbTable,
new MonitorRequest(new MonitorSelect(true, false, false, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
extIntegrationInterface.wifiRadioStatusDbTableUpdate(tableUpdates, key);
}
});
extIntegrationInterface.wifiRadioStatusDbTableUpdate(rsCf.join(), key);
CompletableFuture<TableUpdates> isCf = ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiInetStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiInetStateDbTable,
new MonitorRequest(new MonitorSelect(true, false, false, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
extIntegrationInterface.wifiInetStateDbTableUpdate(tableUpdates, key);
}
});
extIntegrationInterface.wifiInetStateDbTableUpdate(isCf.join(), key);
CompletableFuture<TableUpdates> vsCf = ovsdbClient
.monitor(OvsdbDao.ovsdbName, OvsdbDao.wifiVifStateDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiVifStateDbTable,
new MonitorRequest(new MonitorSelect(true, false, false, true)))),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
extIntegrationInterface.wifiVIFStateDbTableUpdate(tableUpdates, key);
}
});
extIntegrationInterface.wifiVIFStateDbTableUpdate(vsCf.join(), key);
CompletableFuture<TableUpdates> acCf = ovsdbClient.monitor(OvsdbDao.ovsdbName,
OvsdbDao.wifiAssociatedClientsDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.wifiAssociatedClientsDbTable, new MonitorRequest())),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
extIntegrationInterface.wifiAssociatedClientsDbTableUpdate(tableUpdates, key);
}
});
extIntegrationInterface.wifiAssociatedClientsDbTableUpdate(acCf.join(), key);
CompletableFuture<TableUpdates> awCf = ovsdbClient.monitor(OvsdbDao.ovsdbName, OvsdbDao.awlanNodeDbTable,
new MonitorRequests(ImmutableMap.of(OvsdbDao.awlanNodeDbTable, new MonitorRequest())),
new MonitorCallback() {
@Override
public void update(TableUpdates tableUpdates) {
extIntegrationInterface.awlanNodeDbTableUpdate(tableUpdates, key);
}
});
extIntegrationInterface.awlanNodeDbTableUpdate(awCf.join(), key);
}
}