From e3705699b47178ab42e9a45b6f0813681f3a47ec Mon Sep 17 00:00:00 2001 From: Jeffrey Han <39203126+elludraon@users.noreply.github.com> Date: Tue, 1 Nov 2022 15:12:44 -0700 Subject: [PATCH] Add station pinger to RRM service (#116) Signed-off-by: Jeffrey Han <39203126+elludraon@users.noreply.github.com> --- .../openwifi/cloudsdk/UCentralClient.java | 12 - .../openwifi/cloudsdk/UCentralUtils.java | 9 +- .../cloudsdk/kafka/UCentralKafkaConsumer.java | 15 +- .../openwifi/cloudsdk/models/ap/State.java | 2 +- .../java/com/facebook/openwifi/rrm/RRM.java | 7 + .../com/facebook/openwifi/rrm/RRMConfig.java | 8 + .../openwifi/rrm/modules/Modeler.java | 13 +- .../facebook/openwifi/rrm/rca/RCAConfig.java | 83 ++++++ .../facebook/openwifi/rrm/rca/RCAUtils.java | 116 ++++++++ .../rrm/rca/modules/StationPinger.java | 264 ++++++++++++++++++ 10 files changed, 509 insertions(+), 20 deletions(-) create mode 100644 owrrm/src/main/java/com/facebook/openwifi/rrm/rca/RCAConfig.java create mode 100644 owrrm/src/main/java/com/facebook/openwifi/rrm/rca/RCAUtils.java create mode 100644 owrrm/src/main/java/com/facebook/openwifi/rrm/rca/modules/StationPinger.java diff --git a/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/UCentralClient.java b/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/UCentralClient.java index 28fd763..90415e1 100644 --- a/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/UCentralClient.java +++ b/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/UCentralClient.java @@ -628,18 +628,6 @@ public class UCentralClient { } } - /** - * Instruct a device (AP) to ping a given destination (IP/hostname), - * returning the raw ping output or null upon error. - */ - public String pingFromDevice(String serialNumber, String host) { - // TODO pass options, parse output - final int PING_COUNT = 5; - String script = String.format("ping -c %d %s", PING_COUNT, host); - CommandInfo info = runScript(serialNumber, script); - return UCentralUtils.getScriptOutput(info); - } - /** Retrieve a list of inventory from owprov. */ public InventoryTagList getProvInventory() { HttpResponse response = httpGet("inventory", OWPROV_SERVICE); diff --git a/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/UCentralUtils.java b/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/UCentralUtils.java index 1d73812..52a1999 100644 --- a/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/UCentralUtils.java +++ b/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/UCentralUtils.java @@ -25,11 +25,11 @@ import java.util.zip.Inflater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.facebook.openwifi.cloudsdk.models.ap.Capabilities; import com.facebook.openwifi.cloudsdk.ies.Country; import com.facebook.openwifi.cloudsdk.ies.LocalPowerConstraint; import com.facebook.openwifi.cloudsdk.ies.QbssLoad; import com.facebook.openwifi.cloudsdk.ies.TxPwrInfo; +import com.facebook.openwifi.cloudsdk.models.ap.Capabilities; import com.facebook.openwifi.cloudsdk.models.ap.State; import com.facebook.openwifi.cloudsdk.models.gw.CommandInfo; import com.google.gson.Gson; @@ -566,7 +566,12 @@ public class UCentralUtils { return null; } JsonObject status = info.results.get("status").getAsJsonObject(); - if (!status.has("error") || status.get("error").getAsInt() != 0) { + if (!status.has("error")) { + return null; + } + int errorCode = status.get("error").getAsInt(); + if (errorCode != 0) { + logger.error("Script failed with code {}", errorCode); return null; } if (status.has("result")) { diff --git a/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/kafka/UCentralKafkaConsumer.java b/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/kafka/UCentralKafkaConsumer.java index fe23f8c..ef0df23 100644 --- a/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/kafka/UCentralKafkaConsumer.java +++ b/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/kafka/UCentralKafkaConsumer.java @@ -70,7 +70,12 @@ public class UCentralKafkaConsumer { /** The state payload JSON. */ public final JsonObject payload; - /** Unix time (ms). */ + /** + * The record timestamp (Unix time, in ms). + * + * Depending on the broker configuration for "message.timestamp.type", + * this may either be the "CreateTime" or "LogAppendTime". + */ public final long timestampMs; /** Constructor. */ @@ -85,7 +90,12 @@ public class UCentralKafkaConsumer { } } - /** Kafka record listener interface. */ + /** + * Kafka record listener interface. + * + * The inputs must NOT be mutated, as they may be passed to multiple + * listeners and may result in ConcurrentModificationException. + */ public interface KafkaListener { /** Handle a list of state records. */ void handleStateRecords(List records); @@ -271,7 +281,6 @@ public class UCentralKafkaConsumer { serialNumber, payload.toString() ); - // record.timestamp() is empirically confirmed to be Unix time (ms) KafkaRecord kafkaRecord = new KafkaRecord(serialNumber, payload, record.timestamp()); if (record.topic().equals(stateTopic)) { diff --git a/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/models/ap/State.java b/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/models/ap/State.java index d8ec24e..a550b07 100644 --- a/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/models/ap/State.java +++ b/lib-cloudsdk/src/main/java/com/facebook/openwifi/cloudsdk/models/ap/State.java @@ -117,7 +117,7 @@ public class State { public static class Radio { public long active_ms; public long busy_ms; - public int channel; + public int channel; // TODO might be int[] array?? public String channel_width; public long noise; public String phy; diff --git a/owrrm/src/main/java/com/facebook/openwifi/rrm/RRM.java b/owrrm/src/main/java/com/facebook/openwifi/rrm/RRM.java index 68c84e9..63847eb 100644 --- a/owrrm/src/main/java/com/facebook/openwifi/rrm/RRM.java +++ b/owrrm/src/main/java/com/facebook/openwifi/rrm/RRM.java @@ -30,6 +30,7 @@ import com.facebook.openwifi.rrm.modules.Modeler; import com.facebook.openwifi.rrm.modules.ProvMonitor; import com.facebook.openwifi.rrm.modules.RRMScheduler; import com.facebook.openwifi.rrm.mysql.DatabaseManager; +import com.facebook.openwifi.rrm.rca.modules.StationPinger; /** * RRM service runner. @@ -134,10 +135,16 @@ public class RRM { ) : null; KafkaRunner kafkaRunner = (consumer == null && producer == null) ? null : new KafkaRunner(consumer, producer); + StationPinger stationPinger = new StationPinger( + config.rcaConfig.stationPingerParams, + client, + consumer + ); // Add shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.debug("Running shutdown hook..."); + stationPinger.shutdown(); if (kafkaRunner != null) { kafkaRunner.shutdown(); } diff --git a/owrrm/src/main/java/com/facebook/openwifi/rrm/RRMConfig.java b/owrrm/src/main/java/com/facebook/openwifi/rrm/RRMConfig.java index 342826f..d3dc95a 100644 --- a/owrrm/src/main/java/com/facebook/openwifi/rrm/RRMConfig.java +++ b/owrrm/src/main/java/com/facebook/openwifi/rrm/RRMConfig.java @@ -10,6 +10,8 @@ package com.facebook.openwifi.rrm; import java.util.Map; +import com.facebook.openwifi.rrm.rca.RCAConfig; + /** * RRM service configuration model. */ @@ -405,6 +407,9 @@ public class RRMConfig { /** Module configuration. */ public ModuleConfig moduleConfig = new ModuleConfig(); + /** Root cause analysis configuration. */ + public RCAConfig rcaConfig = new RCAConfig(); + /** Construct RRMConfig from environment variables. */ public static RRMConfig fromEnv(Map env) { RRMConfig config = new RRMConfig(); @@ -583,6 +588,9 @@ public class RRMConfig { // @formatter:on + /* RCAConfig */ + config.rcaConfig = RCAConfig.fromEnv(env); + return config; } } diff --git a/owrrm/src/main/java/com/facebook/openwifi/rrm/modules/Modeler.java b/owrrm/src/main/java/com/facebook/openwifi/rrm/modules/Modeler.java index 10d4c01..f67eeed 100644 --- a/owrrm/src/main/java/com/facebook/openwifi/rrm/modules/Modeler.java +++ b/owrrm/src/main/java/com/facebook/openwifi/rrm/modules/Modeler.java @@ -8,6 +8,7 @@ package com.facebook.openwifi.rrm.modules; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -169,10 +170,15 @@ public class Modeler implements Runnable { consumer.addKafkaListener( getClass().getSimpleName(), new UCentralKafkaConsumer.KafkaListener() { + // NOTE: list copying due to potential modification in run() + @Override public void handleStateRecords(List records) { dataQueue.offer( - new InputData(InputDataType.STATE, records) + new InputData( + InputDataType.STATE, + new ArrayList<>(records) + ) ); } @@ -181,7 +187,10 @@ public class Modeler implements Runnable { List records ) { dataQueue.offer( - new InputData(InputDataType.WIFISCAN, records) + new InputData( + InputDataType.WIFISCAN, + new ArrayList<>(records) + ) ); } diff --git a/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/RCAConfig.java b/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/RCAConfig.java new file mode 100644 index 0000000..0dcfab8 --- /dev/null +++ b/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/RCAConfig.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.facebook.openwifi.rrm.rca; + +import java.util.Map; + +/** + * Root cause analysis service configuration model. + */ +public class RCAConfig { + // + // NOTE: + // Currently assumes RCA is embedded in the RRM service and does NOT + // duplicate SDK-related fields. + // + + /** + * StationPinger parameters. + */ + public class StationPingerParams { + /** + * How often to ping each station, in seconds (or 0 to disable) + * ({@code STATIONPINGERPARAMS_PINGINTERVALSEC}) + */ + // NOTE: cannot be shorter than Kafka "state" publish interval + public int pingIntervalSec = 0 /* TODO enable by default */; + + /** + * The number of pings to send to each station + * ({@code STATIONPINGERPARAMS_PINGCOUNT}) + */ + public int pingCount = 5; + + /** + * Ignore state records older than this interval (in ms) + * ({@code STATIONPINGERPARAMS_STALESTATETHRESHOLDMS}) + */ + // NOTE: should not be longer than Kafka "state" publish interval + public int staleStateThresholdMs = 300000; // 5 min + + /** + * Number of executor threads for ping tasks + * ({@code STATIONPINGERPARAMS_EXECUTORTHREADCOUNT}) + */ + public int executorThreadCount = 3; + } + + /** StationPinger parameters. */ + public StationPingerParams stationPingerParams = new StationPingerParams(); + + /** Construct RCAConfig from environment variables. */ + public static RCAConfig fromEnv(Map env) { + RCAConfig config = new RCAConfig(); + String v; + + // @formatter:off + + /* StationPingerParams */ + StationPingerParams stationPingerParams = config.stationPingerParams; + if ((v = env.get("STATIONPINGERPARAMS_PINGINTERVALSEC")) != null) { + stationPingerParams.pingIntervalSec = Integer.parseInt(v); + } + if ((v = env.get("STATIONPINGERPARAMS_PINGCOUNT")) != null) { + stationPingerParams.pingCount = Integer.parseInt(v); + } + if ((v = env.get("STATIONPINGERPARAMS_STALESTATETHRESHOLDMS")) != null) { + stationPingerParams.staleStateThresholdMs = Integer.parseInt(v); + } + if ((v = env.get("STATIONPINGERPARAMS_EXECUTORTHREADCOUNT")) != null) { + stationPingerParams.executorThreadCount = Integer.parseInt(v); + } + + // @formatter:on + + return config; + } +} diff --git a/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/RCAUtils.java b/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/RCAUtils.java new file mode 100644 index 0000000..9cb11d7 --- /dev/null +++ b/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/RCAUtils.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.facebook.openwifi.rrm.rca; + +import com.facebook.openwifi.cloudsdk.UCentralClient; +import com.facebook.openwifi.cloudsdk.UCentralUtils; +import com.facebook.openwifi.cloudsdk.models.gw.CommandInfo; + +/** + * Utilities for root cause analysis. + */ +public class RCAUtils { + /** Ping result, only containing a data summary (not individual pings). */ + public static class PingResult { + // NOTE: fields are taken directly from ping output + /** Minimum ping RTT (ms) */ + public double min; + /** Average ping RTT (ms) */ + public double avg; + /** Maximum ping RTT (ms) */ + public double max; + /** Standard deviation of ping RTT measurements (ms) */ + public double mdev; + // TODO other stats? (ex. tx/rx packets, % packet loss) + + @Override + public String toString() { + return String.format("%.3f/%.3f/%.3f/%.3f ms", min, avg, max, mdev); + } + } + + /** + * Parse raw ping output, returning null upon error. + * + * This only supports the busybox ping format. + */ + private static PingResult parsePingOutput(String output) { + // Only parse summary line (should be last line in output). + // Code below is optimized for minimal string operations. + // + // Examples of supported formats: + // round-trip min/avg/max = 4.126/42.470/84.081 ms + // rtt min/avg/max/mdev = 16.853/20.114/23.375/3.261 ms + final String SUMMARY_TEXT_3 = "min/avg/max"; + int idx = output.lastIndexOf(SUMMARY_TEXT_3); + if (idx != -1) { + idx += SUMMARY_TEXT_3.length(); + } else { + final String SUMMARY_TEXT_4 = "min/avg/max/mdev"; + idx = output.lastIndexOf(SUMMARY_TEXT_4); + if (idx != -1) { + idx += SUMMARY_TEXT_4.length(); + } else { + return null; + } + } + PingResult result = null; + for (; idx < output.length(); idx++) { + if (Character.isDigit(output.charAt(idx))) { + break; + } + } + if (idx < output.length()) { + int endIdx = output.indexOf(' ', idx); + if (endIdx != -1) { + String s = output.substring(idx, endIdx); + String[] tokens = s.split("/"); + if (tokens.length == 3) { + result = new PingResult(); + result.min = Double.parseDouble(tokens[0]); + result.avg = Double.parseDouble(tokens[1]); + result.max = Double.parseDouble(tokens[2]); + } else if (tokens.length == 4) { + result = new PingResult(); + result.min = Double.parseDouble(tokens[0]); + result.avg = Double.parseDouble(tokens[1]); + result.max = Double.parseDouble(tokens[2]); + result.mdev = Double.parseDouble(tokens[3]); + } + } + } + return result; + } + + /** + * Instruct a device (AP) to ping a given destination (IP/hostname), + * returning the raw ping output or null upon error. + * + * @param client the UCentralClient instance + * @param serialNumber the device (AP) serial number + * @param host the ping destination + * @param pingCount the number of pings to send + * @return the ping output, or null upon error + */ + public static PingResult pingFromDevice( + UCentralClient client, + String serialNumber, + String host, + int pingCount + ) { + if (pingCount < 1) { + throw new IllegalArgumentException("Invalid pingCount < 1"); + } + String script = String.format("ping -c %d %s", pingCount, host); + int timeoutSec = pingCount /* time buffer as follows: */ * 2 + 10; + CommandInfo info = client.runScript(serialNumber, script, timeoutSec); + String output = UCentralUtils.getScriptOutput(info); + return output != null ? parsePingOutput(output) : null; + } +} diff --git a/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/modules/StationPinger.java b/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/modules/StationPinger.java new file mode 100644 index 0000000..8680708 --- /dev/null +++ b/owrrm/src/main/java/com/facebook/openwifi/rrm/rca/modules/StationPinger.java @@ -0,0 +1,264 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.facebook.openwifi.rrm.rca.modules; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.facebook.openwifi.cloudsdk.UCentralClient; +import com.facebook.openwifi.cloudsdk.UCentralUtils; +import com.facebook.openwifi.cloudsdk.kafka.UCentralKafkaConsumer; +import com.facebook.openwifi.cloudsdk.kafka.UCentralKafkaConsumer.KafkaRecord; +import com.facebook.openwifi.cloudsdk.models.ap.State; +import com.facebook.openwifi.cloudsdk.models.gw.ServiceEvent; +import com.facebook.openwifi.rrm.Utils; +import com.facebook.openwifi.rrm.rca.RCAConfig.StationPingerParams; +import com.facebook.openwifi.rrm.rca.RCAUtils; +import com.facebook.openwifi.rrm.rca.RCAUtils.PingResult; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; + +/** + * Ping service to measure latency/jitter between Wi-Fi APs and clients + * (stations). + *

+ * This class subscribes to the Kafka "state" topic to retrieve the list of APs + * with connected clients, then issues ping commands for each (AP, STA) pair at + * a given frequency. All actions are submitted to an executor during Kafka + * callbacks. + */ +public class StationPinger { + private static final Logger logger = + LoggerFactory.getLogger(StationPinger.class); + + /** The module parameters. */ + private final StationPingerParams params; + + /** The uCentral client. */ + private final UCentralClient uCentralClient; + + /** The executor service instance. */ + private final ExecutorService executor; + + /** The Gson instance. */ + private final Gson gson = new Gson(); + + /** + * Map from device (serial number) to the latest map of STAs + * (i.e. client MAC address to Client structure). + */ + private Map> deviceToClients = + new ConcurrentHashMap<>(); + + /** + * Map of last ping timestamps, keyed on + * {@link #getDeviceKey(String, String)}. + */ + private Map lastPingTsMap = new ConcurrentHashMap<>(); + + /** Constructor. */ + public StationPinger( + StationPingerParams params, + UCentralClient uCentralClient, + UCentralKafkaConsumer consumer + ) { + this.params = params; + this.uCentralClient = uCentralClient; + this.executor = + Executors.newFixedThreadPool( + params.executorThreadCount, + new Utils.NamedThreadFactory( + "RCA_" + this.getClass().getSimpleName() + ) + ); + + if (params.pingIntervalSec < 1) { + logger.info("StationPinger is disabled"); + return; // quit before registering listeners + } + + // Register Kafka listener + if (consumer != null) { + consumer.addKafkaListener( + getClass().getSimpleName(), + new UCentralKafkaConsumer.KafkaListener() { + @Override + public void handleStateRecords(List records) { + handleKafkaStateRecords(records); + } + + @Override + public void handleWifiScanRecords( + List records + ) { /* ignored */ } + + @Override + public void handleServiceEventRecords( + List serviceEventRecords + ) { /* ignored */ } + } + ); + } + } + + /** Process the list of received State records. */ + private void handleKafkaStateRecords(List records) { + long now = System.currentTimeMillis(); + for (KafkaRecord record : records) { + // Drop old records + if (now - record.timestampMs > params.staleStateThresholdMs) { + logger.debug( + "Dropping old state record for {} at time {}", + record.serialNumber, + record.timestampMs + ); + continue; + } + + // Deserialize State + JsonObject state = record.payload.getAsJsonObject("state"); + if (state == null) { + continue; + } + try { + State stateModel = gson.fromJson(state, State.class); + Map clientMap = + UCentralUtils.getWifiClientInfo(stateModel); + if ( + deviceToClients.put(record.serialNumber, clientMap) == null + ) { + // Enqueue this device + final String serialNumber = record.serialNumber; + executor.submit(() -> pingDevices(serialNumber)); + } + } catch (JsonSyntaxException e) { + logger.error( + String.format( + "Device %s: failed to deserialize state: %s", + record.serialNumber, + state + ), + e + ); + } + } + } + + /** Shut down all resources. */ + public void shutdown() { + executor.shutdownNow(); + } + + /** + * Issue ping commands to all clients of a given AP. + * + * Note that this is intentionally NOT parallelized to avoid collisions + * while transmitting to/from multiple clients of the same AP. + */ + private void pingDevices(String serialNumber) { + Map clientMap = + deviceToClients.get(serialNumber); + if (clientMap == null) { + return; // shouldn't happen + } + + logger.trace( + "{}: Pinging all clients ({} total)...", + serialNumber, + clientMap.size() + ); + final long PING_INTERVAL_NS = + Math.max(params.pingIntervalSec, 1) * 1_000_000_000L; + for ( + Map.Entry entry : clientMap + .entrySet() + ) { + String mac = entry.getKey(); + String host = getClientAddress(entry.getValue()); + if (host == null) { + logger.debug( + "{}: client {} has no pingable address", + serialNumber, + mac + ); + continue; + } + + // Check backoff timer + long now = System.nanoTime(); + String deviceKey = getDeviceKey(serialNumber, mac); + Long lastPingTs = lastPingTsMap.putIfAbsent(deviceKey, now); + if (lastPingTs != null && now - lastPingTs < PING_INTERVAL_NS) { + logger.trace( + "{}: Skipping ping for {} (last pinged {}s ago)", + serialNumber, + mac, + (now - lastPingTs) / 1_000_000_000L + ); + continue; + } + lastPingTsMap.put(deviceKey, now); + + // Issue ping command + logger.debug( + "{}: Pinging client {} ({})", + serialNumber, + mac, + host + ); + PingResult result = RCAUtils + .pingFromDevice(uCentralClient, serialNumber, host, params.pingCount); + if (result == null) { + logger.debug( + "Ping failed from {} to {} ({})", + serialNumber, + mac, + host + ); + continue; + } + // TODO handle results + logger.info( + "Ping result from {} to {} ({}): {}", + serialNumber, + mac, + host, + result.toString() + ); + } + + // Remove map entries after we process them + deviceToClients.remove(serialNumber); + } + + /** Return an address to ping for the given client. */ + private String getClientAddress(State.Interface.Client client) { + if (client.ipv4_addresses.length > 0) { + return client.ipv4_addresses[0]; + } else if (client.ipv6_addresses.length > 0) { + return client.ipv6_addresses[0]; + } else { + return null; + } + } + + /** Return a key to use in {@link #lastPingTsMap}. */ + private String getDeviceKey(String serialNumber, String sta) { + // Use (AP, STA) pair as the key to handle STAs moving between APs + // TODO - do we care about radio/band/channel changes too? + return serialNumber + '\0' + sta; + } +}