mirror of
				https://github.com/Telecominfraproject/wlan-cloud-opensync-controller.git
				synced 2025-11-03 20:17:53 +00:00 
			
		
		
		
	Compare commits
	
		
			9 Commits
		
	
	
		
			change_ses
			...
			WIFI-3216
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					9cc501a8b9 | ||
| 
						 | 
					b720f9882d | ||
| 
						 | 
					91f113505d | ||
| 
						 | 
					14adaaacbe | ||
| 
						 | 
					11f0aa8856 | ||
| 
						 | 
					cbd27b37bf | ||
| 
						 | 
					80834c0a31 | ||
| 
						 | 
					45642464fc | ||
| 
						 | 
					be9d880bc3 | 
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -209,9 +209,9 @@ public class OpensyncCloudGatewayController {
 | 
			
		||||
                    case CellSizeAttributesRequest:
 | 
			
		||||
                        ret.add(sendCellSizeRequest(session, (CEGWCellSizeAttributesRequest) command));
 | 
			
		||||
                        break;
 | 
			
		||||
                    case MostRecentStatsTimestamp:
 | 
			
		||||
                        ret.add(sendGetMostRecentStatsTimestampRequest(command, inventoryId));
 | 
			
		||||
                        break;
 | 
			
		||||
//                    case MostRecentStatsTimestamp:
 | 
			
		||||
//                        ret.add(sendGetMostRecentStatsTimestampRequest(command, inventoryId));
 | 
			
		||||
//                        break;
 | 
			
		||||
                    default:
 | 
			
		||||
                        LOG.warn("[{}] Failed to deliver command {}, unsupported command type", inventoryId, command);
 | 
			
		||||
                        ret.add(new EquipmentCommandResponse(CEGWCommandResultCode.UnsupportedCommand,
 | 
			
		||||
@@ -250,32 +250,7 @@ public class OpensyncCloudGatewayController {
 | 
			
		||||
        return new GatewayDefaults();
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    private EquipmentCommandResponse sendGetMostRecentStatsTimestampRequest(CEGWBaseCommand command, String inventoryId) {
 | 
			
		||||
        Long ts = lastReceivedStatsTimestamp(inventoryId);
 | 
			
		||||
        if (ts == null) {
 | 
			
		||||
            return new EquipmentCommandResponse(CEGWCommandResultCode.NoRouteToCE,
 | 
			
		||||
                    null, command,
 | 
			
		||||
                    registeredGateway == null ? null : registeredGateway.getHostname(),
 | 
			
		||||
                            registeredGateway == null ? -1 : registeredGateway.getPort());
 | 
			
		||||
        } else {
 | 
			
		||||
            return new EquipmentCommandResponse(CEGWCommandResultCode.Success,
 | 
			
		||||
                    ts.toString(), command,
 | 
			
		||||
                    registeredGateway == null ? null : registeredGateway.getHostname(),
 | 
			
		||||
                            registeredGateway == null ? -1 : registeredGateway.getPort());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    @RequestMapping(value = "/lastReceivedStatsTimestamp", method = RequestMethod.GET)
 | 
			
		||||
    public Long lastReceivedStatsTimestamp(@RequestParam String apId) {
 | 
			
		||||
        Long ret = null;
 | 
			
		||||
        if (ovsdbSessionMapInterface.getSession(apId) != null) {
 | 
			
		||||
            ret = ovsdbSessionMapInterface.getSession(apId).getMostRecentStatsTimestamp();
 | 
			
		||||
            LOG.debug("lastReceivedStatsTimestamp for apId {} {}",apId,ret);
 | 
			
		||||
        } else {
 | 
			
		||||
            LOG.warn("lastReceivedStatsTimestamp found no session for {}, cannot get timestamp",apId);
 | 
			
		||||
        }
 | 
			
		||||
        return ret;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Verify a route to customer equipment
 | 
			
		||||
 
 | 
			
		||||
@@ -5,7 +5,6 @@ import java.net.Inet4Address;
 | 
			
		||||
import java.net.Inet6Address;
 | 
			
		||||
import java.net.InetAddress;
 | 
			
		||||
import java.net.UnknownHostException;
 | 
			
		||||
import java.text.DecimalFormat;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.Date;
 | 
			
		||||
@@ -52,7 +51,6 @@ import com.telecominfraproject.wlan.equipment.EquipmentServiceInterface;
 | 
			
		||||
import com.telecominfraproject.wlan.equipment.models.Equipment;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSession;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSessionMapInterface;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.external.integration.controller.OpensyncCloudGatewayController;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.util.OvsdbToWlanCloudTypeMappingUtility;
 | 
			
		||||
import com.telecominfraproject.wlan.profile.ProfileServiceInterface;
 | 
			
		||||
import com.telecominfraproject.wlan.profile.models.Profile;
 | 
			
		||||
@@ -140,8 +138,6 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
 | 
			
		||||
    private RealtimeEventPublisher realtimeEventPublisher;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private AlarmServiceInterface alarmServiceInterface;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private OpensyncCloudGatewayController gatewayController;
 | 
			
		||||
    
 | 
			
		||||
    @Value("${tip.wlan.mqttStatsPublisher.temperatureThresholdInC:80}")
 | 
			
		||||
    private int temperatureThresholdInC;
 | 
			
		||||
@@ -161,6 +157,7 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
 | 
			
		||||
    @Override
 | 
			
		||||
    @Async
 | 
			
		||||
    public void processMqttMessage(String topic, Report report) {
 | 
			
		||||
 | 
			
		||||
        long startTime = System.nanoTime();
 | 
			
		||||
        String apId = extractApIdFromTopic(topic);
 | 
			
		||||
        LOG.info("Received report on topic {} for ap {}", topic, report.getNodeID());
 | 
			
		||||
@@ -176,7 +173,6 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
 | 
			
		||||
        long profileId = ce.getProfileId();
 | 
			
		||||
        
 | 
			
		||||
        // update timestamp for active customer equipment
 | 
			
		||||
        gatewayController.updateActiveCustomer(customerId);
 | 
			
		||||
        List<ServiceMetric> metricRecordList = new ArrayList<>();
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
@@ -212,8 +208,11 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
 | 
			
		||||
 | 
			
		||||
            if (!metricRecordList.isEmpty()) {
 | 
			
		||||
                long serviceMetricTimestamp = System.currentTimeMillis();
 | 
			
		||||
                metricRecordList.stream().forEach(smr -> {              
 | 
			
		||||
                    smr.setCreatedTimestamp(serviceMetricTimestamp);
 | 
			
		||||
                metricRecordList.stream().forEach(smr -> {  
 | 
			
		||||
                	// TODO use serviceMetricTimestamp rather than 0. This is done for now since there are some
 | 
			
		||||
                	// channel metrics that have overlapping keys which messes up Cassandra if the same time stamp is used
 | 
			
		||||
                	// and setting it to 0 allows the CloudEventDispatcherController to assign unique time stamps. 
 | 
			
		||||
                    smr.setCreatedTimestamp(0);
 | 
			
		||||
                    if (smr.getLocationId() == 0)
 | 
			
		||||
                        smr.setLocationId(locationId);
 | 
			
		||||
                    if (smr.getCustomerId() == 0)
 | 
			
		||||
@@ -266,7 +265,6 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void publishSystemEventFromTableStateMonitor(SystemEvent event) {
 | 
			
		||||
        LOG.info("Publishing SystemEvent received by TableStateMonitor {}", event);
 | 
			
		||||
        gatewayController.updateActiveCustomer(event.getCustomerId());
 | 
			
		||||
        cloudEventDispatcherInterface.publishEvent(event);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -176,7 +176,7 @@ public class OpensyncExternalIntegrationCloudTest {
 | 
			
		||||
        location.setDetails(details);
 | 
			
		||||
        location.setName("Location-UT");
 | 
			
		||||
        location.setLocationType(LocationType.BUILDING);
 | 
			
		||||
        Mockito.when(locationServiceInterface.get(8L)).thenReturn(location);
 | 
			
		||||
        Mockito.when(locationServiceInterface.getOrNull(Mockito.anyLong())).thenReturn(location);
 | 
			
		||||
        Customer customer = new Customer();
 | 
			
		||||
        customer.setId(2);
 | 
			
		||||
        CustomerDetails customerDetails = new CustomerDetails();
 | 
			
		||||
@@ -232,7 +232,7 @@ public class OpensyncExternalIntegrationCloudTest {
 | 
			
		||||
        opensyncExternalIntegrationCloud.apConnected("Test_Client_21P10C68818122", createConnectNodeInfo());
 | 
			
		||||
 | 
			
		||||
        Mockito.verify(firmwareServiceInterface).getDefaultCustomerTrackSetting();
 | 
			
		||||
        Mockito.verify(locationServiceInterface).get(8L);
 | 
			
		||||
        Mockito.verify(locationServiceInterface).getOrNull(ArgumentMatchers.anyLong());
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -246,7 +246,7 @@ public class OpensyncExternalIntegrationCloudTest {
 | 
			
		||||
        location.setDetails(details);
 | 
			
		||||
        location.setName("Location-UT");
 | 
			
		||||
        location.setLocationType(LocationType.BUILDING);
 | 
			
		||||
        Mockito.when(locationServiceInterface.get(8L)).thenReturn(location);
 | 
			
		||||
        Mockito.when(locationServiceInterface.getOrNull(8L)).thenReturn(location);
 | 
			
		||||
 | 
			
		||||
        Customer customer = new Customer();
 | 
			
		||||
        customer.setId(2);
 | 
			
		||||
@@ -326,7 +326,7 @@ public class OpensyncExternalIntegrationCloudTest {
 | 
			
		||||
        Mockito.verify(customerServiceInterface).getOrNull(ArgumentMatchers.anyInt());
 | 
			
		||||
        Mockito.verify(equipmentServiceInterface).getByInventoryIdOrNull("Test_Client_21P10C68818122");
 | 
			
		||||
        Mockito.verify(firmwareServiceInterface).getDefaultCustomerTrackSetting();
 | 
			
		||||
        Mockito.verify(locationServiceInterface, Mockito.times(2)).get(ArgumentMatchers.anyLong());
 | 
			
		||||
        Mockito.verify(locationServiceInterface, Mockito.times(2)).getOrNull(ArgumentMatchers.anyLong());
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -54,4 +54,6 @@ public interface OpensyncExternalIntegrationInterface {
 | 
			
		||||
    void nodeStateDbTableUpdate(List<Map<String, String>> nodeStateAttributes, String apId);
 | 
			
		||||
 | 
			
		||||
    void clearEquipmentStatus(String apId);
 | 
			
		||||
    
 | 
			
		||||
    void processMqttMessage(String topic, Report report);
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,5 @@
 | 
			
		||||
package com.telecominfraproject.wlan.opensync.external.integration;
 | 
			
		||||
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
 | 
			
		||||
import com.vmware.ovsdb.service.OvsdbClient;
 | 
			
		||||
 | 
			
		||||
public class OvsdbSession {
 | 
			
		||||
@@ -9,7 +7,6 @@ public class OvsdbSession {
 | 
			
		||||
    private String apId;
 | 
			
		||||
    private long routingId;
 | 
			
		||||
    private long equipmentId;
 | 
			
		||||
    private long mostRecentStatsTimestamp;
 | 
			
		||||
    
 | 
			
		||||
    public OvsdbClient getOvsdbClient() {
 | 
			
		||||
        return ovsdbClient;
 | 
			
		||||
@@ -35,34 +32,5 @@ public class OvsdbSession {
 | 
			
		||||
    public void setEquipmentId(long equipmentId) {
 | 
			
		||||
        this.equipmentId = equipmentId;
 | 
			
		||||
    }
 | 
			
		||||
    public long getMostRecentStatsTimestamp() {
 | 
			
		||||
        return mostRecentStatsTimestamp;
 | 
			
		||||
    }
 | 
			
		||||
    public void setMostRecentStatsTimestamp(long mostRecentStatsTimestamp) {
 | 
			
		||||
        this.mostRecentStatsTimestamp = mostRecentStatsTimestamp;
 | 
			
		||||
    }
 | 
			
		||||
    @Override
 | 
			
		||||
    public int hashCode() {
 | 
			
		||||
        return Objects.hash(apId, equipmentId, mostRecentStatsTimestamp, ovsdbClient, routingId);
 | 
			
		||||
    }
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean equals(Object obj) {
 | 
			
		||||
        if (this == obj)
 | 
			
		||||
            return true;
 | 
			
		||||
        if (obj == null)
 | 
			
		||||
            return false;
 | 
			
		||||
        if (getClass() != obj.getClass())
 | 
			
		||||
            return false;
 | 
			
		||||
        OvsdbSession other = (OvsdbSession) obj;
 | 
			
		||||
        return Objects.equals(apId, other.apId) && equipmentId == other.equipmentId && mostRecentStatsTimestamp == other.mostRecentStatsTimestamp
 | 
			
		||||
                && Objects.equals(ovsdbClient, other.ovsdbClient) && routingId == other.routingId;
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    @Override
 | 
			
		||||
    public String toString() {
 | 
			
		||||
        return "OvsdbSession [ovsdbClient=" + ovsdbClient + ", apId=" + apId + ", routingId=" + routingId + ", equipmentId=" + equipmentId
 | 
			
		||||
                + ", mostRecentStatsTimestamp=" + mostRecentStatsTimestamp + "]";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
 
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -11,10 +11,6 @@ import org.fusesource.mqtt.client.MQTT;
 | 
			
		||||
import org.fusesource.mqtt.client.Message;
 | 
			
		||||
import org.fusesource.mqtt.client.QoS;
 | 
			
		||||
import org.fusesource.mqtt.client.Topic;
 | 
			
		||||
import org.fusesource.mqtt.client.Tracer;
 | 
			
		||||
import org.fusesource.mqtt.codec.MQTTFrame;
 | 
			
		||||
import org.fusesource.mqtt.codec.PINGREQ;
 | 
			
		||||
import org.fusesource.mqtt.codec.PINGRESP;
 | 
			
		||||
import org.slf4j.Logger;
 | 
			
		||||
import org.slf4j.LoggerFactory;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
@@ -36,8 +32,7 @@ import com.netflix.servo.monitor.Stopwatch;
 | 
			
		||||
import com.netflix.servo.monitor.Timer;
 | 
			
		||||
import com.netflix.servo.tag.TagList;
 | 
			
		||||
import com.telecominfraproject.wlan.cloudmetrics.CloudMetricsTags;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSession;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.external.integration.OvsdbSessionMapInterface;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.external.integration.OpensyncExternalIntegrationInterface;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.external.integration.utils.StatsPublisherInterface;
 | 
			
		||||
import com.telecominfraproject.wlan.opensync.util.ZlibUtil;
 | 
			
		||||
 | 
			
		||||
@@ -63,10 +58,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
 | 
			
		||||
    private final Timer timerMessageProcess = new BasicTimer(MonitorConfig.builder("osgw-mqtt-messageProcessTimer").withTags(tags).build());
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private StatsPublisherInterface statsPublisher;
 | 
			
		||||
    
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private OvsdbSessionMapInterface ovsdbSessionMapInterface;
 | 
			
		||||
    private OpensyncExternalIntegrationInterface opensyncExternalIntegrationInterface;
 | 
			
		||||
 | 
			
		||||
    // dtop: use anonymous constructor to ensure that the following code always
 | 
			
		||||
    // get executed,
 | 
			
		||||
@@ -193,18 +185,8 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
 | 
			
		||||
                                // Only supported protobuf on the TIP opensync APs is Report
 | 
			
		||||
                                Report statsReport = Report.parseFrom(payload);
 | 
			
		||||
                                mqttMsg.ack();
 | 
			
		||||
                                String apId = extractApIdFromTopic(mqttMsg.getTopic());
 | 
			
		||||
                                if (apId != null) {
 | 
			
		||||
                                    OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(extractApIdFromTopic(mqttMsg.getTopic()));
 | 
			
		||||
                                    if (ovsdbSession != null) {
 | 
			
		||||
                                        ovsdbSession.setMostRecentStatsTimestamp(System.currentTimeMillis());
 | 
			
		||||
                                        LOG.debug("Last metrics received from AP updated to {}",ovsdbSession.toString());
 | 
			
		||||
                                    } else {
 | 
			
		||||
                                        LOG.debug("No ovsdb session exists for this AP {}",apId);
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                                MQTT_LOG.info("Topic {}\n{}", mqttMsg.getTopic(), jsonPrinter.print(statsReport));
 | 
			
		||||
                                statsPublisher.processMqttMessage(mqttMsg.getTopic(), statsReport);                           
 | 
			
		||||
                                opensyncExternalIntegrationInterface.processMqttMessage(mqttMsg.getTopic(), statsReport);                           
 | 
			
		||||
                                LOG.debug("Dispatched report for topic {} to backend for processing", mqttMsg.getTopic());
 | 
			
		||||
 | 
			
		||||
                            } catch (Exception e) {
 | 
			
		||||
@@ -244,26 +226,5 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
 | 
			
		||||
            mqttClientThread.interrupt();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * @param topic
 | 
			
		||||
     * @return apId extracted from the topic name, or null if it cannot be
 | 
			
		||||
     *         extracted
 | 
			
		||||
     */
 | 
			
		||||
    static String extractApIdFromTopic(String topic) {
 | 
			
		||||
        // Topic is formatted as
 | 
			
		||||
        // "/ap/"+clientCn+"_"+ret.serialNumber+"/opensync"
 | 
			
		||||
        if (topic == null) {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        String[] parts = topic.split("/");
 | 
			
		||||
        if (parts.length < 3) {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // apId is the third element in the topic
 | 
			
		||||
        return parts[2];
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user