TW-20 - Read AP config from KDC instance - part 2

This commit is contained in:
DTop
2020-02-09 23:03:31 -05:00
parent 22106f9c75
commit c581681dc6
9 changed files with 1177 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
package ai.connectus.opensync.external.integration;
import com.vmware.ovsdb.service.OvsdbClient;
public class OvsdbSession {
private OvsdbClient ovsdbClient;
private String apId;
private long routingId;
private long equipmentId;
private int customerId;
public OvsdbClient getOvsdbClient() {
return ovsdbClient;
}
public void setOvsdbClient(OvsdbClient ovsdbClient) {
this.ovsdbClient = ovsdbClient;
}
public String getApId() {
return apId;
}
public void setApId(String apId) {
this.apId = apId;
}
public long getRoutingId() {
return routingId;
}
public void setRoutingId(long routingId) {
this.routingId = routingId;
}
public long getEquipmentId() {
return equipmentId;
}
public void setEquipmentId(long equipmentId) {
this.equipmentId = equipmentId;
}
public int getCustomerId() {
return customerId;
}
public void setCustomerId(int customerId) {
this.customerId = customerId;
}
}

View File

@@ -0,0 +1,15 @@
package ai.connectus.opensync.external.integration;
import java.util.Set;
import com.vmware.ovsdb.service.OvsdbClient;
public interface OvsdbSessionMapInterface {
OvsdbSession getSession(String apId);
OvsdbSession removeSession(String apId);
void closeSession(String apId);
OvsdbSession newSession(String apId, OvsdbClient ovsdbClient);
int getNumSessions();
Set<String> getConnectedClientIds();
String lookupClientId(OvsdbClient ovsdbClient);
}

147
opensync_ext_kdc/pom.xml Normal file
View File

@@ -0,0 +1,147 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.whizcontrol</groupId>
<artifactId>root-pom</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>ai.connectus</groupId>
<artifactId>opensync_ext_kdc</artifactId>
<name>opensync_ext_kdc</name>
<description>Component that reads AP configuration from the KDC deployment, and registers/deregisters APs in there.</description>
<dependencies>
<dependency>
<groupId>ai.connectus</groupId>
<artifactId>opensync_ext_interface</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<artifactId>base-container</artifactId>
<groupId>com.whizcontrol</groupId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>equipment-configuration-manager-service-remote</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>equipment-and-network-management-service-remote</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>equipment-routing-service-remote</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>order-and-subscription-management-service-remote</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- KDC models -->
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>service-metrics-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>rule-engine-partitioning-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>equipment-and-network-status-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>customer-account-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>service-subscription-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>equipment-inventory-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>equipment-routing-info-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>equipment-and-network-config-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>radio-resource-planner-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>system-event-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>issue-and-action-report-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>preferences-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>cami-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>ml-results-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>client-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>rogue-ap-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>msp-account-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>equipment-configuration-manager-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>email-notification-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
<dependency>
<groupId>com.whizcontrol</groupId>
<artifactId>recommendation-models</artifactId>
<version>${whizcontrol.release.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,368 @@
package ai.connectus.opensync.external.integration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import com.whizcontrol.core.model.equipment.EquipmentType;
import com.whizcontrol.core.model.equipment.RadioType;
import com.whizcontrol.equipmentandnetworkconfig.models.ApElementConfiguration;
import com.whizcontrol.equipmentandnetworkconfig.models.ApElementConfiguration.ApModel;
import com.whizcontrol.equipmentandnetworkconfig.models.CountryCode;
import com.whizcontrol.equipmentandnetworkconfig.models.DeviceMode;
import com.whizcontrol.equipmentandnetworkconfig.models.ElementRadioConfiguration;
import com.whizcontrol.equipmentandnetworkconfig.models.EquipmentElementConfiguration;
import com.whizcontrol.equipmentandnetworkconfig.models.GettingDNS;
import com.whizcontrol.equipmentandnetworkconfig.models.GettingIP;
import com.whizcontrol.equipmentandnetworkconfig.models.SsidConfiguration;
import com.whizcontrol.equipmentandnetworkconfig.models.SsidConfiguration.AppliedRadio;
import com.whizcontrol.equipmentandnetworkconfig.models.SsidConfiguration.SecureMode;
import com.whizcontrol.equipmentandnetworkconfig.models.StateSetting;
import com.whizcontrol.equipmentandnetworkmanagement.EquipmentAndNetworkManagementInterface;
import com.whizcontrol.equipmentconfigurationmanager.EquipmentConfigurationManagerInterface;
import com.whizcontrol.equipmentconfigurationmanager.models.ResolvedEquipmentConfiguration;
import com.whizcontrol.equipmentinventory.models.CustomerEquipment;
import com.whizcontrol.equipmentrouting.EquipmentRoutingInterface;
import com.whizcontrol.equipmentroutinginfo.models.EquipmentRoutingRecord;
import com.whizcontrol.orderandsubscriptionmanagement.OrderAndSubscriptionManagementInterface;
import ai.connectus.opensync.external.integration.controller.OpensyncKDCGatewayController;
import ai.connectus.opensync.external.integration.models.OpensyncAPConfig;
import ai.connectus.opensync.external.integration.models.OpensyncAPRadioConfig;
import ai.connectus.opensync.external.integration.models.OpensyncAPSsidConfig;
import sts.PlumeStats.Report;
import traffic.NetworkMetadata.FlowReport;
import wc.stats.IpDnsTelemetry.WCStatsReport;
@Profile("opensync_kdc_config")
@Component
public class OpensyncExternalIntegrationKDC implements OpensyncExternalIntegrationInterface {
private static final Logger LOG = LoggerFactory.getLogger(OpensyncExternalIntegrationKDC.class);
@Autowired
private EquipmentAndNetworkManagementInterface eqNetworkManagementInterface;
@Autowired
private EquipmentConfigurationManagerInterface eqConfigurationManagerInterface;
@Autowired
private OrderAndSubscriptionManagementInterface orderAndSubscriptionManagementInterface;
/**
* Equipment routing provide the qrCode to CE gateway mapping
*/
@Autowired
private EquipmentRoutingInterface eqRoutingInterface;
@Autowired
private OpensyncKDCGatewayController kdcGwController;
@Autowired
private OvsdbSessionMapInterface ovsdbSessionMapInterface;
@Autowired
private CacheManager cacheManagerShortLived;
@Value("${connectus.ovsdb.autoProvisionedCustomerId:1004}")
private int autoProvisionedCustomerId;
@Value("${connectus.ovsdb.autoProvisionedLocationId:2}")
private int autoProvisionedLocationId;
@Value("${connectus.ovsdb.autoProvisionedNetworkConfigId:1}")
private long autoProvisionedNetworkConfigId;
private Cache kdcEquipmentRecordCache;
@PostConstruct
private void postCreate(){
LOG.info("Using KDC integration");
kdcEquipmentRecordCache = cacheManagerShortLived.getCache("KDC_equipment_record_cache");
}
public void apConnected(String apId) {
LOG.info("AP {} got connected to the gateway", apId);
try {
CustomerEquipment ce = null;
try {
ce = kdcEquipmentRecordCache.get(apId, new Callable<CustomerEquipment>() {
@Override
public CustomerEquipment call() throws Exception {
return eqNetworkManagementInterface.getCustomerEquipmentByQrCodeOrNull(apId);
}
});
}catch (Exception e) {
//do nothing
}
if(ce == null) {
//auto-provision APs for the demo
//we'll use hardcoded customerId/locationId/networkConfigId for the new equipment
ce = new CustomerEquipment();
ce.setEquipmentType(EquipmentType.AP);
ce.setModelId("plume");
ce.setQrCode(apId);
ce = eqNetworkManagementInterface.createCustomerEquipment(ce);
//bind newly created CE to specified customer
Set<Long> equipmentIdsToAdd = new HashSet<Long>();
equipmentIdsToAdd.add(ce.getId());
orderAndSubscriptionManagementInterface.bindEquipment(autoProvisionedCustomerId, equipmentIdsToAdd);
// now update CE record itself
ce.setName(apId);
ce.setCustomerId(autoProvisionedCustomerId);
ce.setEquipmentNetworkConfigId(autoProvisionedNetworkConfigId);
ce.setLocation(autoProvisionedLocationId);
ce = eqNetworkManagementInterface.updateCustomerEquipment(ce);
// create the element configuration
LOG.debug("Creating element configuration for AP({})", apId);
EquipmentElementConfiguration equipmentElementConfiguration = new EquipmentElementConfiguration();
EquipmentType equipmentType = ce.getEquipmentType();
equipmentElementConfiguration.setEquipmentId(ce.getId());
equipmentElementConfiguration.setEquipmentType(equipmentType);
equipmentElementConfiguration.setCustomerId(ce.getCustomerId());
equipmentElementConfiguration.setElementConfigVersion("" + equipmentType + "-V1");
ApElementConfiguration apElementConfiguration = ApElementConfiguration.createWithDefaults(
equipmentElementConfiguration.getElementConfigVersion(),
ApModel.INDOOR);
apElementConfiguration.setGettingIP(GettingIP.dhcp);
apElementConfiguration.setGettingDNS(GettingDNS.dhcp);
apElementConfiguration.setDeviceMode(DeviceMode.standaloneAP);
equipmentElementConfiguration.setApElementConfig(apElementConfiguration);
equipmentElementConfiguration = eqNetworkManagementInterface
.createEquipmentElementConfiguration(equipmentElementConfiguration);
//cache newly created AP
kdcEquipmentRecordCache.put(apId, ce);
}
EquipmentRoutingRecord equipmentRoutingRecord = new EquipmentRoutingRecord();
equipmentRoutingRecord.setGatewayRecordId(kdcGwController.getRegisteredGwId());
equipmentRoutingRecord.setCustomerId(ce.getCustomerId());
equipmentRoutingRecord.setEquipmentId(ce.getId());
EquipmentRoutingRecord ret = eqRoutingInterface.registerUERoute(equipmentRoutingRecord);
OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(apId);
ovsdbSession.setRoutingId(ret.getId());
ovsdbSession.setEquipmentId(ce.getId());
ovsdbSession.setCustomerId(ce.getCustomerId());
}catch(Exception e) {
LOG.error("Exception when registering ap routing {}", apId, e);
}
}
public void apDisconnected(String apId) {
LOG.info("AP {} got disconnected from the gateway", apId);
try {
OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(apId);
if(ovsdbSession!=null) {
eqRoutingInterface.deregisterUserEquipment(ovsdbSession.getEquipmentId());
} else {
LOG.warn("Cannot find ap {} in KDC inventory", apId);
}
}catch(Exception e) {
LOG.error("Exception when registering ap routing {}", apId, e);
}
}
public OpensyncAPConfig getApConfig(String apId) {
LOG.info("Retrieving config for AP {} ", apId);
OpensyncAPConfig ret = null;
try {
OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(apId);
if(ovsdbSession == null) {
throw new IllegalStateException("AP is not connected " + apId);
}
long equipmentId = ovsdbSession.getEquipmentId();
ResolvedEquipmentConfiguration resolvedEqCfg = eqConfigurationManagerInterface.getResolvedEquipmentConfiguration(equipmentId, null, null);
if(resolvedEqCfg==null) {
throw new IllegalStateException("Cannot retrieve configuration for " + apId);
}
ret = new OpensyncAPConfig();
//extract country, radio channels from resolvedEqCfg
String country = "CA";
CountryCode countryCode = resolvedEqCfg.getEquipmentCountryCode();
if(countryCode!=null && countryCode!=CountryCode.UNSUPPORTED) {
country = countryCode.toString().toUpperCase();
}
int radioChannel24G = 1;
int radioChannel5LG = 44;
Map<Integer, ElementRadioConfiguration> erc = resolvedEqCfg.getEquipmentElementConfiguration().getApElementConfig().getRadioMap();
if(erc!=null) {
ElementRadioConfiguration erc24 = erc.get(1);
ElementRadioConfiguration erc5 = erc.get(0);
if(erc24!=null) {
radioChannel24G = erc24.getChannelNumber();
}
if(erc5!=null) {
radioChannel5LG = erc5.getChannelNumber();
}
}
OpensyncAPRadioConfig radioConfig = new OpensyncAPRadioConfig();
radioConfig.setCountry(country);
radioConfig.setRadioChannel24G(radioChannel24G);
radioConfig.setRadioChannel5LG(radioChannel5LG);
//hardcoding this one, as there are no config for it in KDC
radioConfig.setRadioChannel5HG(108);
ret.setRadioConfig(radioConfig);
//extract ssid parameters from resolvedEqCfg
List<OpensyncAPSsidConfig> ssidConfigs = new ArrayList<>();
ret.setSsidConfigs(ssidConfigs);
List<SsidConfiguration> resolvedSsids = resolvedEqCfg.getSsidConfigurations();
if(resolvedSsids!=null) {
for(SsidConfiguration ssidCfg : resolvedSsids) {
OpensyncAPSsidConfig osSsidCfg = new OpensyncAPSsidConfig();
osSsidCfg.setSsid(ssidCfg.getSsid());
AppliedRadio ar = ssidCfg.getAppliedRadio();
if(ar == AppliedRadio.radioA) {
osSsidCfg.setRadioType(RadioType.is5GHz);
} else if (ar == AppliedRadio.radioB ){
osSsidCfg.setRadioType(RadioType.is2dot4GHz);
} else if (ar == AppliedRadio.radioAandB ){
osSsidCfg.setRadioType(RadioType.is5GHz);
}
osSsidCfg.setBroadcast(ssidCfg.getBroadcastSsid() == StateSetting.enabled);
if(ssidCfg.getSecureMode() == SecureMode.wpa2OnlyPSK || ssidCfg.getSecureMode() == SecureMode.wpa2PSK) {
osSsidCfg.setEncryption("WPA-PSK");
osSsidCfg.setMode("2");
} else if(ssidCfg.getSecureMode() == SecureMode.wpaPSK ) {
osSsidCfg.setEncryption("WPA-PSK");
osSsidCfg.setMode("1");
} else {
LOG.warn("Unsupported encryption mode {} - will use WPA-PSK instead", ssidCfg.getSecureMode());
osSsidCfg.setEncryption("WPA-PSK");
osSsidCfg.setMode("2");
}
osSsidCfg.setKey(ssidCfg.getKeyStr());
ssidConfigs.add(osSsidCfg);
if( ar==AppliedRadio.radioAandB ) {
//configure the same ssid on the second radio
osSsidCfg = osSsidCfg.clone();
osSsidCfg.setRadioType(RadioType.is2dot4GHz);
ssidConfigs.add(osSsidCfg);
}
}
}
} catch (Exception e) {
LOG.error("Cannot read config for AP {}", apId, e);
}
LOG.debug("Config content : {}", ret);
return ret;
}
/**
* @param topic
* @return apId extracted from the topic name, or null if it cannot be extracted
*/
public String extractApIdFromTopic(String topic) {
//Topic is formatted as "/ap/"+clientCn+"_"+ret.serialNumber+"/opensync"
if(topic==null) {
return null;
}
String[] parts = topic.split("/");
if(parts.length<3) {
return null;
}
//apId is the second element in the topic
return parts[1];
}
/**
* @param topic
* @return customerId looked up from the topic name, or -1 if it cannot be extracted
*/
public int extractCustomerIdFromTopic(String topic) {
String apId = extractApIdFromTopic(topic);
if(apId == null) {
return -1;
}
OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(apId);
if(ovsdbSession!=null) {
return ovsdbSession.getCustomerId();
}
return -1;
}
public void processMqttMessage(String topic, Report report) {
LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID());
int customerId = extractCustomerIdFromTopic(topic);
if(customerId>0) {
kdcGwController.updateActiveCustomer(customerId);
}
}
public void processMqttMessage(String topic, FlowReport flowReport) {
LOG.info("Received flowReport on topic {} for ap {}", topic, flowReport.getObservationPoint().getNodeId());
int customerId = extractCustomerIdFromTopic(topic);
if(customerId>0) {
kdcGwController.updateActiveCustomer(customerId);
}
}
public void processMqttMessage(String topic, WCStatsReport wcStatsReport) {
LOG.info("Received wcStatsReport on topic {} for ap {}", topic, wcStatsReport.getObservationPoint().getNodeId());
int customerId = extractCustomerIdFromTopic(topic);
if(customerId>0) {
kdcGwController.updateActiveCustomer(customerId);
}
}
}

View File

@@ -0,0 +1,29 @@
package ai.connectus.opensync.external.integration.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextStartedEvent;
/**
* Listen for context started event so that we are register with routing service
*
* @author yongli
*
*/
public class OpensyncGatewayControllerStartListener implements ApplicationListener<ContextStartedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(OpensyncGatewayControllerStartListener.class);
OpensyncKDCGatewayController controller;
public OpensyncGatewayControllerStartListener(OpensyncKDCGatewayController controller) {
this.controller = controller;
}
@Override
public void onApplicationEvent(ContextStartedEvent event) {
LOG.debug("Processing ContextStartedEvent event");
controller.registerWithRoutingService();
}
}

View File

@@ -0,0 +1,29 @@
package ai.connectus.opensync.external.integration.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
/**
* Register for stop event so that we can de-register from routing service
*
* @author yongli
*
*/
public class OpensyncGatewayControllerStopListener implements ApplicationListener<ContextClosedEvent> {
OpensyncKDCGatewayController controller;
private static final Logger LOG = LoggerFactory.getLogger(OpensyncGatewayControllerStopListener.class);
public OpensyncGatewayControllerStopListener(OpensyncKDCGatewayController controller) {
this.controller = controller;
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
LOG.debug("Processing ContextClosedEvent event");
controller.deregisterFromRoutingService();
}
}

View File

@@ -0,0 +1,29 @@
package ai.connectus.opensync.external.integration.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.stereotype.Component;
@Component
public class OpensyncGatewayListenerConfiguration {
@Autowired
OpensyncKDCGatewayController controller;
private static final Logger LOG = LoggerFactory.getLogger(OpensyncGatewayControllerStartListener.class);
@Bean
public ApplicationListener<ContextClosedEvent> myStopEventListner() {
LOG.debug("Creating stop event listener");
return new OpensyncGatewayControllerStopListener(controller);
}
@Bean
public ApplicationListener<ContextStartedEvent> myStartedEventListener() {
LOG.debug("Creating start event listener");
return new OpensyncGatewayControllerStartListener(controller);
}
}

View File

@@ -0,0 +1,432 @@
package ai.connectus.opensync.external.integration.controller;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.web.bind.annotation.AuthenticationPrincipal;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.whizcontrol.core.model.json.BaseJsonModel;
import com.whizcontrol.core.model.service.CloudServiceType;
import com.whizcontrol.core.model.service.ServiceInstanceInformation;
import com.whizcontrol.core.server.container.ConnectorProperties;
import com.whizcontrol.customerequipmentgateway.models.CEGWBaseCommand;
import com.whizcontrol.customerequipmentgateway.models.CEGWCloseSessionRequest;
import com.whizcontrol.customerequipmentgateway.models.CEGWCommandResultCode;
import com.whizcontrol.customerequipmentgateway.models.CEGWConfigChangeNotification;
import com.whizcontrol.customerequipmentgateway.models.CEGWRouteCheck;
import com.whizcontrol.customerequipmentgateway.models.CustomerEquipmentCommand;
import com.whizcontrol.customerequipmentgateway.models.CustomerEquipmentCommandResponse;
import com.whizcontrol.customerequipmentgateway.models.GatewayDefaults;
import com.whizcontrol.equipmentrouting.EquipmentRoutingInterface;
import com.whizcontrol.equipmentroutinginfo.models.CustomerEquipmentGwRecord;
import com.whizcontrol.equipmentroutinginfo.models.EquipmentRoutingRecord;
import com.whizcontrol.equipmentroutinginfo.models.RoutingRegisterResponse;
import com.whizcontrol.server.exceptions.ConfigurationException;
import ai.connectus.opensync.external.integration.OvsdbSession;
import ai.connectus.opensync.external.integration.OvsdbSessionMapInterface;
/**
* Opensync Gateway Controller - integration code for Kodacloud deployment
*
* @author yongli
* @author dtop
*
*/
@RestController
@EnableScheduling
public class OpensyncKDCGatewayController {
@Autowired
EquipmentRoutingInterface eqRoutingSvc;
@Autowired
ConnectorProperties connectorProperties;
@Autowired
private ServiceInstanceInformation serviceInstanceInfo;
/**
* Flag indicates if this gateway has registered with routing service
*/
private boolean registeredWithRoutingService = false;
private long registeredGwId = -1;
/**
* Lock used to protected {@link #activeCustomerLock}
*/
private final ReadWriteLock activeCustomerLock = new ReentrantReadWriteLock();
private final Lock activeCustomerReadLock = activeCustomerLock.readLock();
private final Lock activeCustomerWriteLock = activeCustomerLock.writeLock();
@Autowired
private OvsdbSessionMapInterface ovsdbSessionMapInterface;
/**
* Map <customerId, lastSeenTimestamp>
*/
private ConcurrentMap<Integer, Long> activeCustomerMap = new ConcurrentHashMap<>();
/**
* latestTimetamp used when updating {@link #activeCustomerMap}
*/
private final BiFunction<Long, Long, Long> latestTimestamp = new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long oldValue, Long newValue) {
if (newValue.compareTo(oldValue) > 0) {
return newValue;
}
return oldValue;
}
};
private static final Logger LOG = LoggerFactory.getLogger(OpensyncKDCGatewayController.class);
@RequestMapping(value = "/command", method = RequestMethod.POST)
public CustomerEquipmentCommandResponse sendCommand(@RequestBody CEGWBaseCommand command) {
LOG.debug("sendCommand({})", command);
String qrCode = command.getEquipmentQRCode();
if (BaseJsonModel.hasUnsupportedValue(command)) {
LOG.error("[{}] Failed to deliver command {}, command contains unsupported value", qrCode, command);
return new CustomerEquipmentCommandResponse(CEGWCommandResultCode.UnsupportedCommand.ordinal(),
"Unsupported value in command for " + qrCode);
}
OvsdbSession session = ovsdbSessionMapInterface.getSession(qrCode);
if (session == null) {
LOG.warn("[{}] Failed to deliver command {}, websocket session not found", qrCode, command);
return new CustomerEquipmentCommandResponse(CEGWCommandResultCode.NoRouteToCE.ordinal(),
"No session found for " + qrCode);
}
switch (command.getCommandType()) {
case ConfigChangeNotification:
return sendConfigChangeNotification(session, (CEGWConfigChangeNotification) command);
case CloseSessionRequest:
return closeSession(session, (CEGWCloseSessionRequest) command);
case CheckRouting:
return checkEquipmentRouting(session, (CEGWRouteCheck) command);
case StartDebugEngine:
case StopDebugEngine:
case FirmwareDownloadRequest:
case FirmwareFlashRequest:
case RebootRequest:
case BlinkRequest:
case NeighbourhoodReport:
case ClientDeauthRequest:
case CellSizeRequest:
case NewChannelRequest:
case ReportCurrentAPCRequest:
case FileUpdateRequest:
case InterferenceThresholdUpdateRequest:
case BestApConfigurationUpdateRequest:
case StartPacketFileCapture:
case StopPacketCapture:
case ReportCurrentVLANRequest:
case BSSIDToMacMonitoringRequest:
case ChannelChangeAnnouncementRequest:
case UploadLogFile:
case TogglePoERequest:
case RadioReset:
case ClearScanTable:
case WdsRequest:
default:
LOG.warn("[{}] Failed to deliver command {}, unsupported command type", qrCode, command);
return new CustomerEquipmentCommandResponse(CEGWCommandResultCode.UnsupportedCommand.ordinal(),
"Invalid command type (" + command.getCommandType() + ") for equipment (" + qrCode + ")");
}
}
@RequestMapping(value = "/defaults", method = RequestMethod.GET)
public GatewayDefaults retrieveGatewayDefaults()
{
return new GatewayDefaults();
}
/**
* Verify a route to customer equipment
*
* @param session
* @param command
* @param protocolVersion
* @return NoRouteToCE if route Id does not match or Success
*/
private CustomerEquipmentCommandResponse checkEquipmentRouting(OvsdbSession session, CEGWRouteCheck command) {
if (null != command.getRoutingId()) {
if (!command.getRoutingId().equals(session.getRoutingId())) {
LOG.info("[C:{} E:{} R:{}] Stale routing entry ({}) detected",
session.getCustomerId(), command.getEquipmentQRCode(),
session.getRoutingId(), command.getRoutingId());
return new CustomerEquipmentCommandResponse(CEGWCommandResultCode.NoRouteToCE.ordinal(),
"Inactive Route Identifer");
}
}
return new CustomerEquipmentCommandResponse(CEGWCommandResultCode.Success.ordinal(), "Route active");
}
private CustomerEquipmentCommandResponse sendConfigChangeNotification(OvsdbSession session,
CEGWConfigChangeNotification command) {
return sendMessage(session, command.getEquipmentQRCode(), command);
}
private CustomerEquipmentCommandResponse closeSession(OvsdbSession session, CEGWCloseSessionRequest command) {
try {
session.getOvsdbClient().shutdown();
} catch (Exception e) {
LOG.error("[{}] Failed to close session on CE: {}", command.getEquipmentQRCode(), e.getLocalizedMessage());
return new CustomerEquipmentCommandResponse(CEGWCommandResultCode.FailedToSend.ordinal(),
"Failed to send command " + command.getCommandType() + " to " + command.getEquipmentQRCode() + ": "
+ e.getMessage());
}
LOG.debug("[{}] Closed session to CE", command.getEquipmentQRCode());
return new CustomerEquipmentCommandResponse(CEGWCommandResultCode.Success.ordinal(),
"Closed session to " + command.getEquipmentQRCode());
}
/**
* Deliver a message in payload to the CE
*
* @param session
* @param qrCode
* @param command
* @param request
* @return
*/
private CustomerEquipmentCommandResponse sendMessage(OvsdbSession session, String qrCode,
CustomerEquipmentCommand command) {
LOG.debug("Queued command {} for {}", command.getCommandType(), qrCode);
CustomerEquipmentCommandResponse response = new CustomerEquipmentCommandResponse(CEGWCommandResultCode.Success.ordinal(),
"Queued Command " + command.getCommandType() + " to " + qrCode);
//TODO: implement sending command via ovsdb session.
response.setQueued(true);
return response;
}
@RequestMapping(value = "/commandWithUser", method = RequestMethod.POST)
public CustomerEquipmentCommandResponse sendCommandWithAuthUser(@RequestBody CustomerEquipmentCommand command,
@AuthenticationPrincipal Object requestUser, HttpServletRequest httpServletRequest) {
// use these properties to get address and port where request has
// arrived
httpServletRequest.getLocalAddr();
httpServletRequest.getLocalPort();
// requestUser will be instance of
// org.springframework.security.core.userdetails.User for client auth
// and digest auth,
// although other auth providers may return something entirely different
if (requestUser instanceof User) {
LOG.debug("calling command with auth principal: {}", ((User) requestUser).getUsername());
} else {
LOG.debug("calling command with auth principal: {}", requestUser);
}
// This is a test method to show how to get access to the auth user
// object for a given request
return sendCommand(command);
}
/**
* Register this controller with Equipment Routing Service
*/
public void registerWithRoutingService() {
synchronized (this) {
if (registeredWithRoutingService) {
return;
}
if (eqRoutingSvc == null) {
throw new ConfigurationException(
"Unabled to register gateway with routing service: routing service interface not initialized");
}
CustomerEquipmentGwRecord gwRecord = new CustomerEquipmentGwRecord(CloudServiceType.CEGW, getDeploymentId());
// Internal facing service
gwRecord.setGatewayId(getGatewayName());
gwRecord.setIpAddr(connectorProperties.getInternalIpAddress().getHostAddress());
gwRecord.setPort(connectorProperties.getInternalPort());
try {
CustomerEquipmentGwRecord result = this.eqRoutingSvc.registerGateway(gwRecord);
this.registeredGwId = result.getId();
LOG.info("Successfully registered (name={}, id={}) with Routing Service", result.getGatewayId(),
registeredGwId);
registeredWithRoutingService = true;
} catch (RuntimeException e) {
// failed
LOG.error("Failed to register Customer Equipment Gateway (name={}) with Routing Service : {}",
getGatewayName(), e.getLocalizedMessage());
}
}
}
/**
* Return the current deployment identifier
*
* @return
*/
public int getDeploymentId() {
return serviceInstanceInfo.getDeploymentId();
}
/**
* De-register from Routing service
*/
public void deregisterFromRoutingService() {
if (registeredWithRoutingService) {
try {
eqRoutingSvc.deregisterGateway(registeredGwId);
LOG.info("Deregistered Customer Equipment Gateway (name={},id={}) with Routing Service",
getGatewayName(), this.registeredGwId);
} catch (Exception e) {
// failed
LOG.error("Failed to deregister Customer Equipment Gateway (name={},id={}) with Routing Service: {}",
getGatewayName(), this.registeredGwId, e.getLocalizedMessage());
}
registeredWithRoutingService = false;
}
}
public long getRegisteredGwId() {
return this.registeredGwId;
}
/**
* Register a customer equipment with this gateway
*
* @param equipmentName
* @param customerId
* @param equipmentId
* @return associationId
*/
public RoutingRegisterResponse registerCustomerEquipment(String equipmentName, Integer customerId,
Long equipmentId) {
registerWithRoutingService();
if (!registeredWithRoutingService) {
LOG.error("Unable to register customer equipement (name={},id={}): gateway not registered.", equipmentName,
equipmentId);
return null;
}
EquipmentRoutingRecord routingRecord = new EquipmentRoutingRecord();
routingRecord.setCustomerId(customerId);
routingRecord.setEquipmentId(equipmentId);
routingRecord.setGatewayRecordId(this.registeredGwId);
try {
RoutingRegisterResponse result = eqRoutingSvc.registerUERouteForDeployment(routingRecord,
getDeploymentId());
LOG.debug("Registered customer equipment (name={},id={}) with route id={}", equipmentName, equipmentId,
result.getRoutingRecord().getId());
return result;
} catch (Exception e) {
LOG.error("Failed to register customer equipement (name={},id={}): {}", equipmentName, equipmentId,
e.getLocalizedMessage());
}
return null;
}
public void deregisterCustomerEquipment(Long routingId, String equipmentName, Long equipmentId) {
if (!registeredWithRoutingService) {
LOG.error("Unable to deregister customer equipement (name={},id={}): gateway not registered", equipmentName,
equipmentId);
return;
}
try {
LOG.debug("Deregistering customer equipment (name={},id={}) with route id={}", equipmentName, equipmentId,
routingId);
eqRoutingSvc.deregisterUERoute(routingId);
} catch (Exception e) {
LOG.error("Failed to deregister customer equipement (name={},id={}) with route id={}: {}", equipmentName,
equipmentId, routingId, e.getLocalizedMessage());
}
}
/**
* Run every 5 minutes
*/
@Scheduled(initialDelay = 5 * 60 * 1000, fixedRate = 5 * 60 * 1000)
public void updateActiveCustomer() {
try {
Map<Integer, Long> activeMap = this.getActiveCustomerMapForUpdate();
if (null != activeMap) {
LOG.info("Updating active customer records, total record size {}", activeMap.size());
this.eqRoutingSvc.updateActiveCustomer(activeMap, getDeploymentId());
}
} catch (RuntimeException exp) {
LOG.error("Failed to update active customer records due to exception {}", exp.getLocalizedMessage());
}
}
/**
* Use connection internal hostname as the gateway name
*
* @return
*/
private String getGatewayName() {
return connectorProperties.getInternalHostName();
}
/**
* Update the active timestamp for the customer
*
* @param customerId
*/
public void updateActiveCustomer(int customerId) {
this.activeCustomerReadLock.lock();
try {
this.activeCustomerMap.merge(customerId, System.currentTimeMillis(), latestTimestamp);
} finally {
this.activeCustomerReadLock.unlock();
}
}
/**
* Swap the active customer map for reporting if it contains records.
*
* @return null if no records.
*/
protected Map<Integer, Long> getActiveCustomerMapForUpdate() {
this.activeCustomerWriteLock.lock();
try {
Map<Integer, Long> map = null;
if (!this.activeCustomerMap.isEmpty()) {
map = this.activeCustomerMap;
this.activeCustomerMap = new ConcurrentHashMap<>();
}
return map;
} finally {
this.activeCustomerWriteLock.unlock();
}
}
}

View File

@@ -0,0 +1,84 @@
package ai.connectus.opensync.ovsdb;
import java.util.HashSet;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.vmware.ovsdb.service.OvsdbClient;
import ai.connectus.opensync.external.integration.OvsdbSession;
import ai.connectus.opensync.external.integration.OvsdbSessionMapInterface;
@Component
public class OvsdbSessionMap implements OvsdbSessionMapInterface {
private static final Logger LOG = LoggerFactory.getLogger(OvsdbSessionMap.class);
private final ConcurrentHashMap<String, OvsdbSession> connectedClients = new ConcurrentHashMap<>();
@Override
public OvsdbSession getSession(String apId) {
return connectedClients.get(apId);
}
@Override
public OvsdbSession removeSession(String apId) {
return connectedClients.remove(apId);
}
@Override
public void closeSession(String apId) {
try {
connectedClients.get(apId).getOvsdbClient().shutdown();
connectedClients.remove(apId);
LOG.info("Closed ovsdb session for {}", apId);
}catch (Exception e) {
// do nothing
}
}
@Override
public OvsdbSession newSession(String apId, OvsdbClient ovsdbClient) {
OvsdbSession ret = new OvsdbSession();
ret.setApId(apId);
ret.setOvsdbClient(ovsdbClient);
OvsdbSession oldSession = connectedClients.put(apId, ret);
if(oldSession!=null) {
try {
oldSession.getOvsdbClient().shutdown();
LOG.info("Closed old ovsdb session for {}", apId);
}catch (Exception e) {
// do nothing
}
}
LOG.info("Created new ovsdb session for {}", apId);
return ret;
}
@Override
public int getNumSessions() {
return connectedClients.size();
}
@Override
public Set<String> getConnectedClientIds() {
return new HashSet<String>(connectedClients.keySet());
}
@Override
public String lookupClientId(OvsdbClient ovsdbClient) {
String key = connectedClients.searchEntries(1,
(Entry<String, OvsdbSession> t) -> { return t.getValue().getOvsdbClient().equals(ovsdbClient) ? t.getKey() : null ;}
);
return key;
}
}