mirror of
				https://github.com/Telecominfraproject/wlan-cloud-opensync-controller.git
				synced 2025-11-03 20:17:53 +00:00 
			
		
		
		
	Compare commits
	
		
			5 Commits
		
	
	
		
			WIFI-7708-
			...
			release/v1
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					495487ae8f | ||
| 
						 | 
					4e8164fb6d | ||
| 
						 | 
					4b6a18df9f | ||
| 
						 | 
					8724c4521c | ||
| 
						 | 
					583d8e1e4c | 
@@ -25,6 +25,8 @@ import java.util.regex.Matcher;
 | 
				
			|||||||
import java.util.regex.Pattern;
 | 
					import java.util.regex.Pattern;
 | 
				
			||||||
import java.util.stream.Collectors;
 | 
					import java.util.stream.Collectors;
 | 
				
			||||||
import java.util.Arrays;
 | 
					import java.util.Arrays;
 | 
				
			||||||
 | 
					import java.util.Collections;
 | 
				
			||||||
 | 
					import java.util.Comparator;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import org.slf4j.Logger;
 | 
					import org.slf4j.Logger;
 | 
				
			||||||
import org.slf4j.LoggerFactory;
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
@@ -136,6 +138,7 @@ import sts.OpensyncStats.Report;
 | 
				
			|||||||
@Component
 | 
					@Component
 | 
				
			||||||
public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegrationInterface {
 | 
					public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegrationInterface {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public static final String DERIVED_SESSION_ID_PREFIX = "DERIVED-";
 | 
				
			||||||
    protected static final String standard_linux_date_format = "EEE MMM dd HH:mm:ss zzz yyyy";
 | 
					    protected static final String standard_linux_date_format = "EEE MMM dd HH:mm:ss zzz yyyy";
 | 
				
			||||||
    private static final String VLAN_TRUNK_IF_TYPE = "vlan_trunk";
 | 
					    private static final String VLAN_TRUNK_IF_TYPE = "vlan_trunk";
 | 
				
			||||||
    private static final String ALLOWED_VLANS = "allowed_vlans";
 | 
					    private static final String ALLOWED_VLANS = "allowed_vlans";
 | 
				
			||||||
@@ -886,23 +889,17 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void apDisconnected(String apId) {
 | 
					    public void apDisconnected(String apId, Long ctxRoutingId) {
 | 
				
			||||||
        LOG.info("AP {} got disconnected from the gateway", apId);
 | 
					        LOG.info("AP {} got disconnected from the gateway, remove ctxRoutingId {} ", apId, ctxRoutingId);
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
 | 
					            if (ctxRoutingId != null && ctxRoutingId > 0L) {
 | 
				
			||||||
            OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(apId);
 | 
					                try {
 | 
				
			||||||
 | 
					                    routingServiceInterface.delete(ctxRoutingId);
 | 
				
			||||||
            if (ovsdbSession != null) {
 | 
					                } catch (Exception e) {
 | 
				
			||||||
                if (ovsdbSession.getRoutingId() > 0L) {
 | 
					                    LOG.warn("Unable to delete routing service Id {} for ap {}. {}", ctxRoutingId, apId, e);
 | 
				
			||||||
                    try {
 | 
					 | 
				
			||||||
                        routingServiceInterface.delete(ovsdbSession.getRoutingId());
 | 
					 | 
				
			||||||
                    } catch (Exception e) {
 | 
					 | 
				
			||||||
                        LOG.warn("Unable to delete routing service Id {} for ap {}. {}", ovsdbSession.getRoutingId(), apId, e);
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            } else {
 | 
					 | 
				
			||||||
                LOG.warn("Cannot find ap {} in inventory", apId);
 | 
					 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            Equipment ce = equipmentServiceInterface.getByInventoryIdOrNull(apId);
 | 
					            Equipment ce = equipmentServiceInterface.getByInventoryIdOrNull(apId);
 | 
				
			||||||
            if (ce != null) {
 | 
					            if (ce != null) {
 | 
				
			||||||
                List<Status> deletedStatuses = statusServiceInterface.deleteOnEquipmentDisconnect(ce.getCustomerId(), ce.getId());
 | 
					                List<Status> deletedStatuses = statusServiceInterface.deleteOnEquipmentDisconnect(ce.getCustomerId(), ce.getId());
 | 
				
			||||||
@@ -918,6 +915,47 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public Long getLatestRoutingId(String apId) {
 | 
				
			||||||
 | 
					        try {
 | 
				
			||||||
 | 
					            Equipment ce = equipmentServiceInterface.getByInventoryIdOrNull(apId);
 | 
				
			||||||
 | 
					            if (ce != null) {
 | 
				
			||||||
 | 
					                List<EquipmentRoutingRecord> recordList = routingServiceInterface.getRegisteredRouteList(ce.getId());
 | 
				
			||||||
 | 
					                LOG.info("{} routing records found for AP {}, sort and return latest routingRecordId", recordList.size(), apId);
 | 
				
			||||||
 | 
					                if (!recordList.isEmpty()) {
 | 
				
			||||||
 | 
					                    // Sort by latest record first
 | 
				
			||||||
 | 
					                    Collections.sort(recordList, new Comparator<EquipmentRoutingRecord>() {
 | 
				
			||||||
 | 
					                        @Override
 | 
				
			||||||
 | 
					                        public int compare(EquipmentRoutingRecord o1, EquipmentRoutingRecord o2) {
 | 
				
			||||||
 | 
					                            return Long.compare(o2.getLastModifiedTimestamp(), o1.getLastModifiedTimestamp());
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                    });
 | 
				
			||||||
 | 
					                    return recordList.get(0).getId();
 | 
				
			||||||
 | 
					                } else {
 | 
				
			||||||
 | 
					                    LOG.debug("No records found for AP {} equipmentId {}", apId, ce.getId());
 | 
				
			||||||
 | 
					                    return null;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                // Equipment doesn't exist, nothing to clean up
 | 
				
			||||||
 | 
					                LOG.debug("No equipment found for AP {} ", apId);
 | 
				
			||||||
 | 
					                return null;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        } catch (Exception e) {
 | 
				
			||||||
 | 
					            LOG.error("Exception when registering ap routing {}", apId, e);
 | 
				
			||||||
 | 
					            // Equipment doesn't exist, nothing to clean up
 | 
				
			||||||
 | 
					            return null;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public void removeRoutingRecord(Long routingId) {
 | 
				
			||||||
 | 
					        try {
 | 
				
			||||||
 | 
					            routingServiceInterface.delete(routingId);
 | 
				
			||||||
 | 
					        } catch (Exception e) {
 | 
				
			||||||
 | 
					            LOG.warn("Unable to delete routing service Id {}. {}", routingId, e);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void updateApDisconnectedStatus(String apId, Equipment ce) {
 | 
					    private void updateApDisconnectedStatus(String apId, Equipment ce) {
 | 
				
			||||||
        LOG.info("updateApDisconnectedStatus disconnected AP {}", apId);
 | 
					        LOG.info("updateApDisconnectedStatus disconnected AP {}", apId);
 | 
				
			||||||
@@ -1788,7 +1826,7 @@ public class OpensyncExternalIntegrationCloud implements OpensyncExternalIntegra
 | 
				
			|||||||
                clientSession.setLocationId(ce.getLocationId());
 | 
					                clientSession.setLocationId(ce.getLocationId());
 | 
				
			||||||
                clientSession.setDetails(new ClientSessionDetails());
 | 
					                clientSession.setDetails(new ClientSessionDetails());
 | 
				
			||||||
                long derivedSessionId = WiFiSessionUtility.encodeWiFiAssociationId(timestamp / 1000, clientInstance.getMacAddress().getAddressAsLong());
 | 
					                long derivedSessionId = WiFiSessionUtility.encodeWiFiAssociationId(timestamp / 1000, clientInstance.getMacAddress().getAddressAsLong());
 | 
				
			||||||
                clientSession.getDetails().setSessionId(Long.toUnsignedString(derivedSessionId));
 | 
					                clientSession.getDetails().setSessionId(DERIVED_SESSION_ID_PREFIX + Long.toUnsignedString(derivedSessionId));
 | 
				
			||||||
                clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(derivedSessionId)));
 | 
					                clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(derivedSessionId)));
 | 
				
			||||||
                clientSession.getDetails().setAssociationState(AssociationState._802_11_Associated);
 | 
					                clientSession.getDetails().setAssociationState(AssociationState._802_11_Associated);
 | 
				
			||||||
                clientSession.getDetails().setIsReassociation(false);
 | 
					                clientSession.getDetails().setIsReassociation(false);
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,6 +21,7 @@ import com.telecominfraproject.wlan.client.session.models.ClientSession;
 | 
				
			|||||||
import com.telecominfraproject.wlan.client.session.models.ClientSessionDetails;
 | 
					import com.telecominfraproject.wlan.client.session.models.ClientSessionDetails;
 | 
				
			||||||
import com.telecominfraproject.wlan.cloudeventdispatcher.CloudEventDispatcherInterface;
 | 
					import com.telecominfraproject.wlan.cloudeventdispatcher.CloudEventDispatcherInterface;
 | 
				
			||||||
import com.telecominfraproject.wlan.core.model.equipment.MacAddress;
 | 
					import com.telecominfraproject.wlan.core.model.equipment.MacAddress;
 | 
				
			||||||
 | 
					import com.telecominfraproject.wlan.opensync.external.integration.OpensyncExternalIntegrationCloud;
 | 
				
			||||||
import com.telecominfraproject.wlan.opensync.util.OvsdbToWlanCloudTypeMappingUtility;
 | 
					import com.telecominfraproject.wlan.opensync.util.OvsdbToWlanCloudTypeMappingUtility;
 | 
				
			||||||
import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric;
 | 
					import com.telecominfraproject.wlan.servicemetric.models.ServiceMetric;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -105,10 +106,10 @@ public class AsyncPublishService {
 | 
				
			|||||||
                    processClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession);
 | 
					                    processClientAssocEvent(customerId, equipmentId, locationId, apEventClientSession);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                if (apEventClientSession.hasClientIpEvent()) {
 | 
					                if (apEventClientSession.hasClientIpEvent()) {
 | 
				
			||||||
                    processClientIpEvent(customerId, equipmentId, locationId, apEventClientSession);
 | 
					                    processClientIpEvent(apId, customerId, equipmentId, locationId, apEventClientSession);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                if (apEventClientSession.hasClientDisconnectEvent()) {
 | 
					                if (apEventClientSession.hasClientDisconnectEvent()) {
 | 
				
			||||||
                    processClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession);
 | 
					                    processClientDisconnectEvent(apId, customerId, equipmentId, locationId, apEventClientSession);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            realtimeEventPublisher.publishChannelHopEvents(customerId, equipmentId, locationId, eventReport);
 | 
					            realtimeEventPublisher.publishChannelHopEvents(customerId, equipmentId, locationId, eventReport);
 | 
				
			||||||
@@ -238,7 +239,7 @@ public class AsyncPublishService {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    private void processClientIpEvent(int customerId, long equipmentId, long locationId, sts.OpensyncStats.EventReport.ClientSession apEventClientSession) {
 | 
					    private void processClientIpEvent(String apId, int customerId, long equipmentId, long locationId, sts.OpensyncStats.EventReport.ClientSession apEventClientSession) {
 | 
				
			||||||
        ClientIpEvent apClientEvent = apEventClientSession.getClientIpEvent();
 | 
					        ClientIpEvent apClientEvent = apEventClientSession.getClientIpEvent();
 | 
				
			||||||
        com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac()));
 | 
					        com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac()));
 | 
				
			||||||
        if (client == null) {
 | 
					        if (client == null) {
 | 
				
			||||||
@@ -263,12 +264,16 @@ public class AsyncPublishService {
 | 
				
			|||||||
        if (clientSession.getDetails().getPriorEquipmentId() == null) {
 | 
					        if (clientSession.getDetails().getPriorEquipmentId() == null) {
 | 
				
			||||||
            clientSession.getDetails().setPriorEquipmentId(clientSession.getEquipmentId());
 | 
					            clientSession.getDetails().setPriorEquipmentId(clientSession.getEquipmentId());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (clientSession.getDetails().getPriorSessionId() == null) {
 | 
					        
 | 
				
			||||||
            if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId())))
 | 
					        String sessionId = clientSession.getDetails().getSessionId();
 | 
				
			||||||
                clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId());
 | 
					        if (!Objects.equal(sessionId, Long.toUnsignedString(apEventClientSession.getSessionId()))) {
 | 
				
			||||||
        }
 | 
					            if (sessionId != null && !sessionId.startsWith(OpensyncExternalIntegrationCloud.DERIVED_SESSION_ID_PREFIX)) {
 | 
				
			||||||
        if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) {
 | 
					                LOG.info("Ignored clientIpEvent for different session ID for AP {}, clientMac {}: currentSessionId {}, ipSessionID {} ", apId,
 | 
				
			||||||
            clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId());
 | 
					                        apClientEvent.getStaMac(), sessionId, apEventClientSession.getSessionId());
 | 
				
			||||||
 | 
					                realtimeEventPublisher.publishClientIpEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIpEvent());
 | 
				
			||||||
 | 
					                return;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            clientSession.getDetails().setPriorSessionId(sessionId);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId()));
 | 
					        clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId()));
 | 
				
			||||||
        if (apClientEvent.hasIpAddr()) {
 | 
					        if (apClientEvent.hasIpAddr()) {
 | 
				
			||||||
@@ -294,7 +299,7 @@ public class AsyncPublishService {
 | 
				
			|||||||
        realtimeEventPublisher.publishClientIpEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIpEvent());
 | 
					        realtimeEventPublisher.publishClientIpEvent(customerId, equipmentId, locationId, apEventClientSession.getClientIpEvent());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    private void processClientDisconnectEvent(int customerId, long equipmentId, long locationId,
 | 
					    private void processClientDisconnectEvent(String apId, int customerId, long equipmentId, long locationId,
 | 
				
			||||||
            sts.OpensyncStats.EventReport.ClientSession apEventClientSession) {
 | 
					            sts.OpensyncStats.EventReport.ClientSession apEventClientSession) {
 | 
				
			||||||
        ClientDisconnectEvent apClientEvent = apEventClientSession.getClientDisconnectEvent();
 | 
					        ClientDisconnectEvent apClientEvent = apEventClientSession.getClientDisconnectEvent();
 | 
				
			||||||
        com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac()));
 | 
					        com.telecominfraproject.wlan.client.models.Client client = clientServiceInterface.getOrNull(customerId, MacAddress.valueOf(apClientEvent.getStaMac()));
 | 
				
			||||||
@@ -316,12 +321,15 @@ public class AsyncPublishService {
 | 
				
			|||||||
            clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId())));
 | 
					            clientSession.getDetails().setDhcpDetails(new ClientDhcpDetails(Long.toUnsignedString(apEventClientSession.getSessionId())));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (clientSession.getDetails().getPriorSessionId() == null) {
 | 
					        String sessionId = clientSession.getDetails().getSessionId();
 | 
				
			||||||
            if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId())))
 | 
					        if (!Objects.equal(sessionId, Long.toUnsignedString(apEventClientSession.getSessionId()))) {
 | 
				
			||||||
                clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId());
 | 
					            if (sessionId != null && !sessionId.startsWith(OpensyncExternalIntegrationCloud.DERIVED_SESSION_ID_PREFIX)) {
 | 
				
			||||||
        }
 | 
					                LOG.info("Ignored clientDisconnectEvent for different session ID for AP {}, clientMac {}: currentSessionId {}, disconnectSessionID {} ", apId,
 | 
				
			||||||
        if (!Objects.equal(clientSession.getDetails().getSessionId(), Long.toUnsignedString(apEventClientSession.getSessionId()))) {
 | 
					                        apClientEvent.getStaMac(), sessionId, apEventClientSession.getSessionId());
 | 
				
			||||||
            clientSession.getDetails().setPriorSessionId(clientSession.getDetails().getSessionId());
 | 
					                realtimeEventPublisher.publishClientDisconnectEvent(customerId, equipmentId, locationId, apEventClientSession.getClientDisconnectEvent());
 | 
				
			||||||
 | 
					                return;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            clientSession.getDetails().setPriorSessionId(sessionId);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId()));
 | 
					        clientSession.getDetails().setSessionId(Long.toUnsignedString(apEventClientSession.getSessionId()));
 | 
				
			||||||
        clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand()));
 | 
					        clientSession.getDetails().setRadioType(OvsdbToWlanCloudTypeMappingUtility.getRadioTypeFromOpensyncStatsRadioBandType(apClientEvent.getBand()));
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1190,12 +1190,12 @@ public class MqttStatsPublisher implements StatsPublisherInterface {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        String[] parts = topic.split("/");
 | 
					        String[] parts = topic.split("/");
 | 
				
			||||||
        if (parts.length < 3) {
 | 
					        if (parts.length < 4) {
 | 
				
			||||||
            return null;
 | 
					            return null;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // apId is the third element in the topic
 | 
					        // apId is the third element in the topic
 | 
				
			||||||
        return parts[2];
 | 
					        return parts[3];
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -394,7 +394,7 @@ public class OpensyncExternalIntegrationCloudTest {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					    @Test
 | 
				
			||||||
    public void testApDisconnected() {
 | 
					    public void testApDisconnected() {
 | 
				
			||||||
        opensyncExternalIntegrationCloud.apDisconnected("Test_Client_21P10C68818122");
 | 
					        opensyncExternalIntegrationCloud.apDisconnected("Test_Client_21P10C68818122", 0L);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					    @Test
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -22,7 +22,11 @@ public interface OpensyncExternalIntegrationInterface {
 | 
				
			|||||||
        INIT, INSERT, DELETE, MODIFY
 | 
					        INIT, INSERT, DELETE, MODIFY
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void apDisconnected(String apId);
 | 
					    void apDisconnected(String apId, Long ctxRoutingId);
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    Long getLatestRoutingId(String apId);
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    void removeRoutingRecord(Long routingId);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    OpensyncAPConfig getApConfig(String apId);
 | 
					    OpensyncAPConfig getApConfig(String apId);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,5 +1,7 @@
 | 
				
			|||||||
package com.telecominfraproject.wlan.opensync.external.integration;
 | 
					package com.telecominfraproject.wlan.opensync.external.integration;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.concurrent.atomic.AtomicInteger;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import com.vmware.ovsdb.service.OvsdbClient;
 | 
					import com.vmware.ovsdb.service.OvsdbClient;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
public class OvsdbSession {
 | 
					public class OvsdbSession {
 | 
				
			||||||
@@ -7,6 +9,7 @@ public class OvsdbSession {
 | 
				
			|||||||
    private String apId;
 | 
					    private String apId;
 | 
				
			||||||
    private long routingId;
 | 
					    private long routingId;
 | 
				
			||||||
    private long equipmentId;
 | 
					    private long equipmentId;
 | 
				
			||||||
 | 
					    private AtomicInteger currentConfigNumInFlight = new AtomicInteger();
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    public OvsdbClient getOvsdbClient() {
 | 
					    public OvsdbClient getOvsdbClient() {
 | 
				
			||||||
        return ovsdbClient;
 | 
					        return ovsdbClient;
 | 
				
			||||||
@@ -32,5 +35,9 @@ public class OvsdbSession {
 | 
				
			|||||||
    public void setEquipmentId(long equipmentId) {
 | 
					    public void setEquipmentId(long equipmentId) {
 | 
				
			||||||
        this.equipmentId = equipmentId;
 | 
					        this.equipmentId = equipmentId;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    public AtomicInteger getCurrentConfigNumInFlight() {
 | 
				
			||||||
 | 
					        return currentConfigNumInFlight;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 
 | 
					 
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -64,7 +64,7 @@ public class OpensyncExternalIntegrationSimple implements OpensyncExternalIntegr
 | 
				
			|||||||
        LOG.info("ConnectNodeInfo {}", connectNodeInfo);
 | 
					        LOG.info("ConnectNodeInfo {}", connectNodeInfo);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public void apDisconnected(String apId) {
 | 
					    public void apDisconnected(String apId, Long ctxRoutingId) {
 | 
				
			||||||
        LOG.info("AP {} got disconnected from the gateway", apId);
 | 
					        LOG.info("AP {} got disconnected from the gateway", apId);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -222,4 +222,15 @@ public class OpensyncExternalIntegrationSimple implements OpensyncExternalIntegr
 | 
				
			|||||||
    public void updateConfigVersionInStatus(String apId, long configVersionFromProfiles) {
 | 
					    public void updateConfigVersionInStatus(String apId, long configVersionFromProfiles) {
 | 
				
			||||||
        // do nothing here
 | 
					        // do nothing here
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public Long getLatestRoutingId(String apId) {
 | 
				
			||||||
 | 
					        LOG.info("getLatestRoutingId for AP {}", apId);
 | 
				
			||||||
 | 
					        return null;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public void removeRoutingRecord(Long routingId) {
 | 
				
			||||||
 | 
					        LOG.info("removeRoutingRecord for routingId {}", routingId);        
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -60,6 +60,9 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
 | 
				
			|||||||
    @Autowired
 | 
					    @Autowired
 | 
				
			||||||
    private OpensyncExternalIntegrationInterface opensyncExternalIntegrationInterface;
 | 
					    private OpensyncExternalIntegrationInterface opensyncExternalIntegrationInterface;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Value("${tip.wlan.internalHostName:localhost}") 
 | 
				
			||||||
 | 
					    private String internalHostName;
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
    // dtop: use anonymous constructor to ensure that the following code always
 | 
					    // dtop: use anonymous constructor to ensure that the following code always
 | 
				
			||||||
    // get executed,
 | 
					    // get executed,
 | 
				
			||||||
    // even when somebody adds another constructor in here
 | 
					    // even when somebody adds another constructor in here
 | 
				
			||||||
@@ -138,7 +141,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
 | 
				
			|||||||
                        MQTT mqtt = new MQTT();
 | 
					                        MQTT mqtt = new MQTT();
 | 
				
			||||||
                        mqtt.setHost("tls://" + mqttBrokerAddress + ":" + mqttBrokerListenPort);
 | 
					                        mqtt.setHost("tls://" + mqttBrokerAddress + ":" + mqttBrokerListenPort);
 | 
				
			||||||
                        LOG.info("Connecting to MQTT broker at {}", mqtt.getHost());
 | 
					                        LOG.info("Connecting to MQTT broker at {}", mqtt.getHost());
 | 
				
			||||||
                        mqtt.setClientId("opensync_mqtt");
 | 
					                        mqtt.setClientId("opensync_mqtt_" + internalHostName);
 | 
				
			||||||
                        mqtt.setUserName(username);
 | 
					                        mqtt.setUserName(username);
 | 
				
			||||||
                        mqtt.setPassword(password);
 | 
					                        mqtt.setPassword(password);
 | 
				
			||||||
                        blockingConnection = mqtt.blockingConnection();
 | 
					                        blockingConnection = mqtt.blockingConnection();
 | 
				
			||||||
@@ -147,7 +150,7 @@ public class OpensyncMqttClient implements ApplicationListener<ContextClosedEven
 | 
				
			|||||||
                        LOG.debug("Connected to MQTT broker at {}", mqtt.getHost());
 | 
					                        LOG.debug("Connected to MQTT broker at {}", mqtt.getHost());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        // NB. setting to AT_MOST_ONCE to match the APs message level
 | 
					                        // NB. setting to AT_MOST_ONCE to match the APs message level
 | 
				
			||||||
                        Topic[] topics = {new Topic("/ap/#", QoS.AT_MOST_ONCE),};
 | 
					                        Topic[] topics = {new Topic("/ap/opensync_mqtt_" + internalHostName + "/#", QoS.AT_MOST_ONCE),};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        blockingConnection.subscribe(topics);
 | 
					                        blockingConnection.subscribe(topics);
 | 
				
			||||||
                        LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics));
 | 
					                        LOG.info("Subscribed to mqtt topics {}", Arrays.asList(topics));
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,6 +20,7 @@ import com.telecominfraproject.wlan.opensync.external.integration.models.*;
 | 
				
			|||||||
import com.telecominfraproject.wlan.opensync.ovsdb.dao.OvsdbDao;
 | 
					import com.telecominfraproject.wlan.opensync.ovsdb.dao.OvsdbDao;
 | 
				
			||||||
import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbClientWithMetrics;
 | 
					import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbClientWithMetrics;
 | 
				
			||||||
import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbMetrics;
 | 
					import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbMetrics;
 | 
				
			||||||
 | 
					import com.telecominfraproject.wlan.opensync.util.OvsdbClientUtil;
 | 
				
			||||||
import com.telecominfraproject.wlan.opensync.util.OvsdbStringConstants;
 | 
					import com.telecominfraproject.wlan.opensync.util.OvsdbStringConstants;
 | 
				
			||||||
import com.telecominfraproject.wlan.opensync.util.SslUtil;
 | 
					import com.telecominfraproject.wlan.opensync.util.SslUtil;
 | 
				
			||||||
import com.vmware.ovsdb.callback.ConnectionCallback;
 | 
					import com.vmware.ovsdb.callback.ConnectionCallback;
 | 
				
			||||||
@@ -141,26 +142,32 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface {
 | 
				
			|||||||
					subjectDn = ((X509Certificate) remoteCertificate).getSubjectDN().getName();
 | 
										subjectDn = ((X509Certificate) remoteCertificate).getSubjectDN().getName();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    String clientCn = SslUtil.extractCN(subjectDn);
 | 
					                    String clientCn = SslUtil.extractCN(subjectDn);
 | 
				
			||||||
                    LOG.info("ovsdbClient connecting from {} on port {} clientCn {}", remoteHost, localPort, clientCn);
 | 
					                    if (clientCn != null && !clientCn.isEmpty()) {
 | 
				
			||||||
 | 
					                        LOG.info("ovsdbClient connecting from {} on port {} clientCn {}", remoteHost, localPort, clientCn);
 | 
				
			||||||
                    ConnectNodeInfo connectNodeInfo = ovsdbDao.getConnectNodeInfo(ovsdbClient);
 | 
					    
 | 
				
			||||||
 | 
					                        ConnectNodeInfo connectNodeInfo = ovsdbDao.getConnectNodeInfo(ovsdbClient);
 | 
				
			||||||
                    // successfully connected - register it in our
 | 
					    
 | 
				
			||||||
                    // connectedClients table
 | 
					                        // successfully connected - register it in our
 | 
				
			||||||
 | 
					                        // connectedClients table
 | 
				
			||||||
                    String key = alterClientCnIfRequired(clientCn, connectNodeInfo);
 | 
					    
 | 
				
			||||||
                    ovsdbSessionMapInterface.newSession(key, ovsdbClient);
 | 
					                        String key = alterClientCnIfRequired(clientCn, connectNodeInfo);
 | 
				
			||||||
 | 
					                        OvsdbSession ovsdbSession = ovsdbSessionMapInterface.newSession(key, ovsdbClient);
 | 
				
			||||||
                    extIntegrationInterface.apConnected(key, connectNodeInfo);
 | 
					                        extIntegrationInterface.apConnected(key, connectNodeInfo);
 | 
				
			||||||
 | 
					                        // DT: at this point the routing Id is associated with the session, let's store it into the
 | 
				
			||||||
                    processConnectRequest(ovsdbClient, clientCn, connectNodeInfo);
 | 
					                        // connectionInfo object so that the disconnect handler has access to it
 | 
				
			||||||
 | 
					                        OvsdbClientUtil.setRoutingId(ovsdbClient, ovsdbSession.getRoutingId());
 | 
				
			||||||
                    monitorOvsdbStateTables(ovsdbClient, key);
 | 
					    
 | 
				
			||||||
 | 
					                        processConnectRequest(ovsdbClient, clientCn, connectNodeInfo);
 | 
				
			||||||
                    connectionsCreated.increment();
 | 
					    
 | 
				
			||||||
                    LOG.info("ovsdbClient connected from {} on port {} AP {} ", remoteHost, localPort, key);
 | 
					                        monitorOvsdbStateTables(ovsdbClient, key);
 | 
				
			||||||
                    LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions());
 | 
					    
 | 
				
			||||||
 | 
					                        connectionsCreated.increment();
 | 
				
			||||||
 | 
					                        LOG.info("ovsdbClient connected from {} on port {} AP {} ", remoteHost, localPort, key);
 | 
				
			||||||
 | 
					                        LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions());
 | 
				
			||||||
 | 
					                    } else {
 | 
				
			||||||
 | 
					                        LOG.debug("ovsdbClient: clientCn is empty - not connecting.");
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
                } catch (IllegalStateException e) {
 | 
					                } catch (IllegalStateException e) {
 | 
				
			||||||
                    connectionsFailed.increment();
 | 
					                    connectionsFailed.increment();
 | 
				
			||||||
                    LOG.error("autoprovisioning error {}", e.getMessage(), e);
 | 
					                    LOG.error("autoprovisioning error {}", e.getMessage(), e);
 | 
				
			||||||
@@ -181,6 +188,7 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface {
 | 
				
			|||||||
                connectionsDropped.increment();
 | 
					                connectionsDropped.increment();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                String remoteHost;
 | 
					                String remoteHost;
 | 
				
			||||||
 | 
					                int remotePort;
 | 
				
			||||||
                int localPort;
 | 
					                int localPort;
 | 
				
			||||||
                String clientCn;
 | 
					                String clientCn;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -196,6 +204,7 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
                try {
 | 
					                try {
 | 
				
			||||||
                    remoteHost = ovsdbClient.getConnectionInfo().getRemoteAddress().getHostAddress();
 | 
					                    remoteHost = ovsdbClient.getConnectionInfo().getRemoteAddress().getHostAddress();
 | 
				
			||||||
 | 
					                    remotePort = ovsdbClient.getConnectionInfo().getRemotePort();
 | 
				
			||||||
                    localPort = ovsdbClient.getConnectionInfo().getLocalPort();
 | 
					                    localPort = ovsdbClient.getConnectionInfo().getLocalPort();
 | 
				
			||||||
                    String subjectDn = null;
 | 
					                    String subjectDn = null;
 | 
				
			||||||
                    try {
 | 
					                    try {
 | 
				
			||||||
@@ -206,17 +215,63 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface {
 | 
				
			|||||||
                        // do nothing
 | 
					                        // do nothing
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    clientCn = SslUtil.extractCN(subjectDn);
 | 
					                    clientCn = SslUtil.extractCN(subjectDn);
 | 
				
			||||||
                    key = ovsdbSessionMapInterface.lookupClientId(ovsdbClient);
 | 
					                    if (clientCn != null && !clientCn.isEmpty()) {
 | 
				
			||||||
                    if (key != null) {
 | 
					                        Long ctxRoutingId =  OvsdbClientUtil.getRoutingId(ovsdbClient);
 | 
				
			||||||
                        try {
 | 
					                        Long latestDbRoutingId = extIntegrationInterface.getLatestRoutingId(clientCn);
 | 
				
			||||||
                            extIntegrationInterface.apDisconnected(key);
 | 
					                        OvsdbSession ovsdbSession = ovsdbSessionMapInterface.getSession(clientCn);
 | 
				
			||||||
                            ovsdbSessionMapInterface.removeSession(key);
 | 
					
 | 
				
			||||||
                        } catch (Exception e) {
 | 
					                        LOG.info("Determine true disconnect with AP {} ctxRoutingId {} latestDbRoutingId {} ", clientCn, ctxRoutingId, latestDbRoutingId);
 | 
				
			||||||
                            LOG.debug("Unable to process ap disconnect. {}", e);
 | 
					                        // Determine whether this is a true disconnect based on latest routing record
 | 
				
			||||||
 | 
					                        if (ctxRoutingId != null && latestDbRoutingId != null) {
 | 
				
			||||||
 | 
					                            // if context matches latest DB routingId, this is a true disconnect
 | 
				
			||||||
 | 
					                            if (ctxRoutingId.equals(latestDbRoutingId)) {
 | 
				
			||||||
 | 
					                                try {
 | 
				
			||||||
 | 
					                                    extIntegrationInterface.apDisconnected(clientCn, ctxRoutingId);
 | 
				
			||||||
 | 
					                                    ovsdbSessionMapInterface.removeSession(clientCn);
 | 
				
			||||||
 | 
					                                } catch (Exception e) {
 | 
				
			||||||
 | 
					                                    LOG.debug("Unable to process ap disconnect. {}", e);
 | 
				
			||||||
 | 
					                                }
 | 
				
			||||||
 | 
					                            } else {
 | 
				
			||||||
 | 
					                                if (ovsdbSession != null) {
 | 
				
			||||||
 | 
					                                    if (!latestDbRoutingId.equals(ovsdbSession.getRoutingId())) {
 | 
				
			||||||
 | 
					                                        ovsdbSessionMapInterface.removeSession(clientCn);
 | 
				
			||||||
 | 
					                                    }
 | 
				
			||||||
 | 
					                                }
 | 
				
			||||||
 | 
					                                
 | 
				
			||||||
 | 
					                                extIntegrationInterface.removeRoutingRecord(ctxRoutingId);
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
 | 
					                            
 | 
				
			||||||
 | 
					                        // else, clearly handle all other cases
 | 
				
			||||||
 | 
					                        } else if (ctxRoutingId == null && latestDbRoutingId != null) {
 | 
				
			||||||
 | 
					                            // session was not initialized properly in connect path, we can remove this session
 | 
				
			||||||
 | 
					                            LOG.debug("ctxRoutingId null latestDbRoutingId {}, remove non-initialized session for AP {} ", latestDbRoutingId, clientCn);
 | 
				
			||||||
 | 
					                            if (ovsdbSession != null) {
 | 
				
			||||||
 | 
					                                if (ovsdbSession.getRoutingId() == 0L) {
 | 
				
			||||||
 | 
					                                    ovsdbSessionMapInterface.removeSession(clientCn);
 | 
				
			||||||
 | 
					                                }
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
 | 
					                            
 | 
				
			||||||
 | 
					                        } else if (ctxRoutingId != null && latestDbRoutingId == null) {
 | 
				
			||||||
 | 
					                            // no routing exist at all, so we know this session is not using any routing record, we can remove this session
 | 
				
			||||||
 | 
					                            LOG.debug("ctxRoutingId {} latestDbRoutingId null, no routing exists, remove session for AP {} ", latestDbRoutingId, clientCn);
 | 
				
			||||||
 | 
					                            if (ovsdbSession != null) {
 | 
				
			||||||
 | 
					                                if (ctxRoutingId.equals(ovsdbSession.getRoutingId())) {
 | 
				
			||||||
 | 
					                                    ovsdbSessionMapInterface.removeSession(clientCn);
 | 
				
			||||||
 | 
					                                }
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
 | 
					                            
 | 
				
			||||||
 | 
					                        } else {
 | 
				
			||||||
 | 
					                            // ctxRoutingId == null && latestDbRoutingId == null
 | 
				
			||||||
 | 
					                            // This session is not initialized properly and no routings exist for any session
 | 
				
			||||||
 | 
					                            // We can remove this session without any checks
 | 
				
			||||||
 | 
					                            LOG.debug("ctxRoutingId null latestDbRoutingId null, remove session for AP {} ", latestDbRoutingId, clientCn);
 | 
				
			||||||
 | 
					                            ovsdbSessionMapInterface.removeSession(clientCn);
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
 | 
					                        
 | 
				
			||||||
 | 
					                        LOG.info("ovsdbClient disconnected from {} on local port {} remote port {} clientCn {} ", remoteHost, localPort,
 | 
				
			||||||
 | 
					                                remotePort, clientCn);
 | 
				
			||||||
 | 
					                        LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions());
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    LOG.info("ovsdbClient disconnected from {} on port {} clientCn {} AP {} ", remoteHost, localPort, clientCn, key);
 | 
					 | 
				
			||||||
                    LOG.info("ovsdbClient connectedClients = {}", ovsdbSessionMapInterface.getNumSessions());
 | 
					 | 
				
			||||||
                } finally {
 | 
					                } finally {
 | 
				
			||||||
                    try {
 | 
					                    try {
 | 
				
			||||||
                        ovsdbClient.shutdown();
 | 
					                        ovsdbClient.shutdown();
 | 
				
			||||||
@@ -385,29 +440,56 @@ public class TipWlanOvsdbClient implements OvsdbClientInterface {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        OvsdbClient ovsdbClient = ovsdbSession.getOvsdbClient();
 | 
					        OvsdbClient ovsdbClient = ovsdbSession.getOvsdbClient();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        OpensyncAPConfig opensyncAPConfig = extIntegrationInterface.getApConfig(apId);
 | 
					        int currentConfigCount = ovsdbSession.getCurrentConfigNumInFlight().get();
 | 
				
			||||||
 | 
					        if (currentConfigCount == 0L) {
 | 
				
			||||||
        if (opensyncAPConfig == null) {
 | 
					            // Current count is 0, start this config push
 | 
				
			||||||
            LOG.warn("AP with id " + apId + " does not have a config to apply.");
 | 
					            // Increment other incoming configs into the count until this config push is done
 | 
				
			||||||
            return;
 | 
					            currentConfigCount = ovsdbSession.getCurrentConfigNumInFlight().incrementAndGet();
 | 
				
			||||||
 | 
					            do {
 | 
				
			||||||
 | 
					                try {
 | 
				
			||||||
 | 
					                    OpensyncAPConfig opensyncAPConfig = extIntegrationInterface.getApConfig(apId);
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    if (opensyncAPConfig == null) {
 | 
				
			||||||
 | 
					                        LOG.warn("AP with id " + apId + " does not have a config to apply.");
 | 
				
			||||||
 | 
					                        return;
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					                    //get last known configVersion from the AP
 | 
				
			||||||
 | 
					                    long configVersionFromAp = checkBeforePushConfigVersionFromAp ? ovsdbDao.getConfigVersionFromNode(ovsdbClient) : 0;
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    //get last known configVersion from the EquipmentProtocolStatusData        
 | 
				
			||||||
 | 
					                    long configVersionFromStatus = checkBeforePushConfigVersionFromStatus ? extIntegrationInterface.getConfigVersionFromStatus(apId) : 0;
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    //get current configVersion from the profiles and equipment
 | 
				
			||||||
 | 
					                    long configVersionFromProfiles = opensyncAPConfig.getConfigVersion();
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    boolean needToPushConfigToAP = needToPushToAp(configVersionFromAp, configVersionFromStatus, configVersionFromProfiles);
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					                    if(needToPushConfigToAP) {
 | 
				
			||||||
 | 
					                        pushConfigToAp(ovsdbClient, opensyncAPConfig, apId, configVersionFromProfiles);
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                    LOG.debug("Finished processConfigChanged for {}", apId);
 | 
				
			||||||
 | 
					                } catch (Exception ex) {
 | 
				
			||||||
 | 
					                    // If anything fails in the ovsdb config push, clean up count and exit
 | 
				
			||||||
 | 
					                    ovsdbSession.getCurrentConfigNumInFlight().set(0);
 | 
				
			||||||
 | 
					                    return;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                int checkCount = ovsdbSession.getCurrentConfigNumInFlight().get();
 | 
				
			||||||
 | 
					                if (checkCount == currentConfigCount) {
 | 
				
			||||||
 | 
					                    // Count didn't change from pre-config push, we can clean up and exit
 | 
				
			||||||
 | 
					                    if (ovsdbSession.getCurrentConfigNumInFlight().compareAndSet(currentConfigCount, 0)) {
 | 
				
			||||||
 | 
					                        return;
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                } else {
 | 
				
			||||||
 | 
					                    // Count has changed, update the currentConfigCount and rerun config push
 | 
				
			||||||
 | 
					                    ovsdbSession.getCurrentConfigNumInFlight().incrementAndGet();
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            } while (ovsdbSession.getCurrentConfigNumInFlight().get() != 0);
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            // Count is not 0, another request is being processed for this OvsdbSession
 | 
				
			||||||
 | 
					            // Remember this request, and the other thread will check the count when it's done
 | 
				
			||||||
 | 
					            ovsdbSession.getCurrentConfigNumInFlight().incrementAndGet();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					 | 
				
			||||||
        //get last known configVersion from the AP
 | 
					 | 
				
			||||||
        long configVersionFromAp = checkBeforePushConfigVersionFromAp ? ovsdbDao.getConfigVersionFromNode(ovsdbClient) : 0;
 | 
					 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        //get last known configVersion from the EquipmentProtocolStatusData        
 | 
					 | 
				
			||||||
        long configVersionFromStatus = checkBeforePushConfigVersionFromStatus ? extIntegrationInterface.getConfigVersionFromStatus(apId) : 0;
 | 
					 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        //get current configVersion from the profiles and equipment
 | 
					 | 
				
			||||||
        long configVersionFromProfiles = opensyncAPConfig.getConfigVersion();
 | 
					 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        boolean needToPushConfigToAP = needToPushToAp(configVersionFromAp, configVersionFromStatus, configVersionFromProfiles);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if(needToPushConfigToAP) {
 | 
					 | 
				
			||||||
            pushConfigToAp(ovsdbClient, opensyncAPConfig, apId, configVersionFromProfiles);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        LOG.debug("Finished processConfigChanged for {}", apId);
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -32,6 +32,10 @@ import com.vmware.ovsdb.service.OvsdbClient;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
@Component
 | 
					@Component
 | 
				
			||||||
public class OvsdbNode extends OvsdbDaoBase {
 | 
					public class OvsdbNode extends OvsdbDaoBase {
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						@org.springframework.beans.factory.annotation.Value("${tip.wlan.internalHostName:localhost}") 
 | 
				
			||||||
 | 
					    private String internalHostName;
 | 
				
			||||||
 | 
						
 | 
				
			||||||
    String changeRedirectorAddress(OvsdbClient ovsdbClient, String apId, String newRedirectorAddress) {
 | 
					    String changeRedirectorAddress(OvsdbClient ovsdbClient, String apId, String newRedirectorAddress) {
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
            List<Operation> operations = new ArrayList<>();
 | 
					            List<Operation> operations = new ArrayList<>();
 | 
				
			||||||
@@ -380,7 +384,7 @@ public class OvsdbNode extends OvsdbDaoBase {
 | 
				
			|||||||
            Map<String, String> newMqttSettings = new HashMap<>();
 | 
					            Map<String, String> newMqttSettings = new HashMap<>();
 | 
				
			||||||
            newMqttSettings.put("broker", mqttBrokerAddress);
 | 
					            newMqttSettings.put("broker", mqttBrokerAddress);
 | 
				
			||||||
            String mqttClientName = OvsdbToWlanCloudTypeMappingUtility.getAlteredClientCnIfRequired(clientCn, incomingConnectNodeInfo, preventCnAlteration);
 | 
					            String mqttClientName = OvsdbToWlanCloudTypeMappingUtility.getAlteredClientCnIfRequired(clientCn, incomingConnectNodeInfo, preventCnAlteration);
 | 
				
			||||||
            newMqttSettings.put("topics", "/ap/" + mqttClientName + "/opensync");
 | 
					            newMqttSettings.put("topics", "/ap/opensync_mqtt_" + internalHostName + "/" + mqttClientName + "/opensync");
 | 
				
			||||||
            newMqttSettings.put("port", "" + mqttBrokerExternalPort);
 | 
					            newMqttSettings.put("port", "" + mqttBrokerExternalPort);
 | 
				
			||||||
            newMqttSettings.put("compress", "zlib");
 | 
					            newMqttSettings.put("compress", "zlib");
 | 
				
			||||||
            newMqttSettings.put("qos", "0");
 | 
					            newMqttSettings.put("qos", "0");
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -25,6 +25,10 @@ public class OvsdbClientWithMetrics implements OvsdbClient {
 | 
				
			|||||||
        this.delegate = delegate;
 | 
					        this.delegate = delegate;
 | 
				
			||||||
        this.metrics = metrics;
 | 
					        this.metrics = metrics;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    public OvsdbClient getDelegate() {
 | 
				
			||||||
 | 
					        return delegate;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public CompletableFuture<String[]> listDatabases() throws OvsdbClientException {
 | 
					    public CompletableFuture<String[]> listDatabases() throws OvsdbClientException {
 | 
				
			||||||
        metrics.listDatabases.increment();
 | 
					        metrics.listDatabases.increment();
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -0,0 +1,65 @@
 | 
				
			|||||||
 | 
					package com.telecominfraproject.wlan.opensync.util;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.lang.reflect.Field;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.telecominfraproject.wlan.opensync.ovsdb.metrics.OvsdbClientWithMetrics;
 | 
				
			||||||
 | 
					import com.vmware.ovsdb.service.OvsdbClient;
 | 
				
			||||||
 | 
					import com.vmware.ovsdb.service.impl.OvsdbClientImpl;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import io.netty.channel.Channel;
 | 
				
			||||||
 | 
					import io.netty.util.AttributeKey;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Uses reflection to associate routingId with low-level netty channel.
 | 
				
			||||||
 | 
					 * 
 | 
				
			||||||
 | 
					 * @author dtop
 | 
				
			||||||
 | 
					 * 
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					public class OvsdbClientUtil {
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    public static final AttributeKey<String> routingRecordIdAttrKey = AttributeKey.valueOf("routingRecordId");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public static void setRoutingId(OvsdbClient ovsdbClient, long routingId) {
 | 
				
			||||||
 | 
					        Channel channel = getChannel(ovsdbClient);
 | 
				
			||||||
 | 
					        channel.attr(routingRecordIdAttrKey).set(Long.toString(routingId));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public static Long getRoutingId(OvsdbClient ovsdbClient) {
 | 
				
			||||||
 | 
					        Channel channel = getChannel(ovsdbClient);
 | 
				
			||||||
 | 
					        String strVal = channel.attr(routingRecordIdAttrKey).get();
 | 
				
			||||||
 | 
					        return strVal==null?null:Long.parseLong(strVal);        
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    private static Channel getChannel(OvsdbClient ovsdbClient) {
 | 
				
			||||||
 | 
					        if(ovsdbClient instanceof OvsdbClientWithMetrics) {
 | 
				
			||||||
 | 
					            //unwrap the object, if needed
 | 
				
			||||||
 | 
					            ovsdbClient = ((OvsdbClientWithMetrics) ovsdbClient).getDelegate();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        if(! (ovsdbClient instanceof OvsdbClientImpl)) {
 | 
				
			||||||
 | 
					            throw new RuntimeException("Do not know how to handle "+ ovsdbClient.getClass().getName()+" - expected OvsdbClientImpl");
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        try {
 | 
				
			||||||
 | 
					            Field jsonRpcClientField = ovsdbClient.getClass().getDeclaredField("jsonRpcClient");
 | 
				
			||||||
 | 
					            jsonRpcClientField.setAccessible(true);
 | 
				
			||||||
 | 
					            Object jsonRpcClientObj = jsonRpcClientField.get(ovsdbClient);
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            Field transporterField = jsonRpcClientObj.getClass().getDeclaredField("transporter");
 | 
				
			||||||
 | 
					            transporterField.setAccessible(true);
 | 
				
			||||||
 | 
					            Object transporterObj = transporterField.get(jsonRpcClientObj);
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            Field channelField = transporterObj.getClass().getDeclaredField("val$channel");
 | 
				
			||||||
 | 
					            channelField.setAccessible(true);
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            Channel channel = (Channel) channelField.get(transporterObj);
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            return channel;
 | 
				
			||||||
 | 
					        }catch(Exception e) {
 | 
				
			||||||
 | 
					            throw new RuntimeException("Cannot get the channel for the ovsdbClient "+ ovsdbClient, e);            
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -14,6 +14,7 @@ import org.mockito.Mockito;
 | 
				
			|||||||
import org.mockito.MockitoSession;
 | 
					import org.mockito.MockitoSession;
 | 
				
			||||||
import org.mockito.quality.Strictness;
 | 
					import org.mockito.quality.Strictness;
 | 
				
			||||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
					import org.springframework.beans.factory.annotation.Autowired;
 | 
				
			||||||
 | 
					import org.springframework.beans.factory.annotation.Value;
 | 
				
			||||||
import org.springframework.boot.test.context.SpringBootTest;
 | 
					import org.springframework.boot.test.context.SpringBootTest;
 | 
				
			||||||
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
 | 
					import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
 | 
				
			||||||
import org.springframework.boot.test.mock.mockito.MockBean;
 | 
					import org.springframework.boot.test.mock.mockito.MockBean;
 | 
				
			||||||
@@ -85,7 +86,10 @@ public class OvsdbNodeTest {
 | 
				
			|||||||
    OvsdbRadiusProxyConfig ovsdbRadiusProxyConfig;
 | 
					    OvsdbRadiusProxyConfig ovsdbRadiusProxyConfig;
 | 
				
			||||||
    @MockBean(answer = Answers.RETURNS_MOCKS)
 | 
					    @MockBean(answer = Answers.RETURNS_MOCKS)
 | 
				
			||||||
    OvsdbGet ovsdbGet;
 | 
					    OvsdbGet ovsdbGet;
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    @Value("${tip.wlan.internalHostName:localhost}") 
 | 
				
			||||||
 | 
					    private String internalHostName;
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
    MockitoSession mockito;
 | 
					    MockitoSession mockito;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Configuration
 | 
					    @Configuration
 | 
				
			||||||
@@ -122,7 +126,7 @@ public class OvsdbNodeTest {
 | 
				
			|||||||
        newMqttSettings.put("broker", ovsdbNode.mqttBrokerAddress);
 | 
					        newMqttSettings.put("broker", ovsdbNode.mqttBrokerAddress);
 | 
				
			||||||
        String mqttClientName = OvsdbToWlanCloudTypeMappingUtility.getAlteredClientCnIfRequired("AP-1", connectNodeInfo,
 | 
					        String mqttClientName = OvsdbToWlanCloudTypeMappingUtility.getAlteredClientCnIfRequired("AP-1", connectNodeInfo,
 | 
				
			||||||
                false);
 | 
					                false);
 | 
				
			||||||
        newMqttSettings.put("topics", "/ap/" + mqttClientName + "/opensync");
 | 
					        newMqttSettings.put("topics", "/ap/opensync_mqtt_" + internalHostName + "/" + mqttClientName + "/opensync");
 | 
				
			||||||
        newMqttSettings.put("port", "" + ovsdbNode.mqttBrokerExternalPort);
 | 
					        newMqttSettings.put("port", "" + ovsdbNode.mqttBrokerExternalPort);
 | 
				
			||||||
        newMqttSettings.put("compress", "zlib");
 | 
					        newMqttSettings.put("compress", "zlib");
 | 
				
			||||||
        newMqttSettings.put("qos", "0");
 | 
					        newMqttSettings.put("qos", "0");
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user