Add station pinger to RRM service (#116)

Signed-off-by: Jeffrey Han <39203126+elludraon@users.noreply.github.com>
This commit is contained in:
Jeffrey Han
2022-11-01 15:12:44 -07:00
committed by GitHub
parent 35eddf73cf
commit e3705699b4
10 changed files with 509 additions and 20 deletions

View File

@@ -628,18 +628,6 @@ public class UCentralClient {
}
}
/**
* Instruct a device (AP) to ping a given destination (IP/hostname),
* returning the raw ping output or null upon error.
*/
public String pingFromDevice(String serialNumber, String host) {
// TODO pass options, parse output
final int PING_COUNT = 5;
String script = String.format("ping -c %d %s", PING_COUNT, host);
CommandInfo info = runScript(serialNumber, script);
return UCentralUtils.getScriptOutput(info);
}
/** Retrieve a list of inventory from owprov. */
public InventoryTagList getProvInventory() {
HttpResponse<String> response = httpGet("inventory", OWPROV_SERVICE);

View File

@@ -25,11 +25,11 @@ import java.util.zip.Inflater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.facebook.openwifi.cloudsdk.models.ap.Capabilities;
import com.facebook.openwifi.cloudsdk.ies.Country;
import com.facebook.openwifi.cloudsdk.ies.LocalPowerConstraint;
import com.facebook.openwifi.cloudsdk.ies.QbssLoad;
import com.facebook.openwifi.cloudsdk.ies.TxPwrInfo;
import com.facebook.openwifi.cloudsdk.models.ap.Capabilities;
import com.facebook.openwifi.cloudsdk.models.ap.State;
import com.facebook.openwifi.cloudsdk.models.gw.CommandInfo;
import com.google.gson.Gson;
@@ -566,7 +566,12 @@ public class UCentralUtils {
return null;
}
JsonObject status = info.results.get("status").getAsJsonObject();
if (!status.has("error") || status.get("error").getAsInt() != 0) {
if (!status.has("error")) {
return null;
}
int errorCode = status.get("error").getAsInt();
if (errorCode != 0) {
logger.error("Script failed with code {}", errorCode);
return null;
}
if (status.has("result")) {

View File

@@ -70,7 +70,12 @@ public class UCentralKafkaConsumer {
/** The state payload JSON. */
public final JsonObject payload;
/** Unix time (ms). */
/**
* The record timestamp (Unix time, in ms).
*
* Depending on the broker configuration for "message.timestamp.type",
* this may either be the "CreateTime" or "LogAppendTime".
*/
public final long timestampMs;
/** Constructor. */
@@ -85,7 +90,12 @@ public class UCentralKafkaConsumer {
}
}
/** Kafka record listener interface. */
/**
* Kafka record listener interface.
*
* The inputs must NOT be mutated, as they may be passed to multiple
* listeners and may result in ConcurrentModificationException.
*/
public interface KafkaListener {
/** Handle a list of state records. */
void handleStateRecords(List<KafkaRecord> records);
@@ -271,7 +281,6 @@ public class UCentralKafkaConsumer {
serialNumber,
payload.toString()
);
// record.timestamp() is empirically confirmed to be Unix time (ms)
KafkaRecord kafkaRecord =
new KafkaRecord(serialNumber, payload, record.timestamp());
if (record.topic().equals(stateTopic)) {

View File

@@ -117,7 +117,7 @@ public class State {
public static class Radio {
public long active_ms;
public long busy_ms;
public int channel;
public int channel; // TODO might be int[] array??
public String channel_width;
public long noise;
public String phy;

View File

@@ -30,6 +30,7 @@ import com.facebook.openwifi.rrm.modules.Modeler;
import com.facebook.openwifi.rrm.modules.ProvMonitor;
import com.facebook.openwifi.rrm.modules.RRMScheduler;
import com.facebook.openwifi.rrm.mysql.DatabaseManager;
import com.facebook.openwifi.rrm.rca.modules.StationPinger;
/**
* RRM service runner.
@@ -134,10 +135,16 @@ public class RRM {
) : null;
KafkaRunner kafkaRunner = (consumer == null && producer == null)
? null : new KafkaRunner(consumer, producer);
StationPinger stationPinger = new StationPinger(
config.rcaConfig.stationPingerParams,
client,
consumer
);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.debug("Running shutdown hook...");
stationPinger.shutdown();
if (kafkaRunner != null) {
kafkaRunner.shutdown();
}

View File

@@ -10,6 +10,8 @@ package com.facebook.openwifi.rrm;
import java.util.Map;
import com.facebook.openwifi.rrm.rca.RCAConfig;
/**
* RRM service configuration model.
*/
@@ -405,6 +407,9 @@ public class RRMConfig {
/** Module configuration. */
public ModuleConfig moduleConfig = new ModuleConfig();
/** Root cause analysis configuration. */
public RCAConfig rcaConfig = new RCAConfig();
/** Construct RRMConfig from environment variables. */
public static RRMConfig fromEnv(Map<String, String> env) {
RRMConfig config = new RRMConfig();
@@ -583,6 +588,9 @@ public class RRMConfig {
// @formatter:on
/* RCAConfig */
config.rcaConfig = RCAConfig.fromEnv(env);
return config;
}
}

View File

@@ -8,6 +8,7 @@
package com.facebook.openwifi.rrm.modules;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -169,10 +170,15 @@ public class Modeler implements Runnable {
consumer.addKafkaListener(
getClass().getSimpleName(),
new UCentralKafkaConsumer.KafkaListener() {
// NOTE: list copying due to potential modification in run()
@Override
public void handleStateRecords(List<KafkaRecord> records) {
dataQueue.offer(
new InputData(InputDataType.STATE, records)
new InputData(
InputDataType.STATE,
new ArrayList<>(records)
)
);
}
@@ -181,7 +187,10 @@ public class Modeler implements Runnable {
List<KafkaRecord> records
) {
dataQueue.offer(
new InputData(InputDataType.WIFISCAN, records)
new InputData(
InputDataType.WIFISCAN,
new ArrayList<>(records)
)
);
}

View File

@@ -0,0 +1,83 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
package com.facebook.openwifi.rrm.rca;
import java.util.Map;
/**
* Root cause analysis service configuration model.
*/
public class RCAConfig {
//
// NOTE:
// Currently assumes RCA is embedded in the RRM service and does NOT
// duplicate SDK-related fields.
//
/**
* StationPinger parameters.
*/
public class StationPingerParams {
/**
* How often to ping each station, in seconds (or 0 to disable)
* ({@code STATIONPINGERPARAMS_PINGINTERVALSEC})
*/
// NOTE: cannot be shorter than Kafka "state" publish interval
public int pingIntervalSec = 0 /* TODO enable by default */;
/**
* The number of pings to send to each station
* ({@code STATIONPINGERPARAMS_PINGCOUNT})
*/
public int pingCount = 5;
/**
* Ignore state records older than this interval (in ms)
* ({@code STATIONPINGERPARAMS_STALESTATETHRESHOLDMS})
*/
// NOTE: should not be longer than Kafka "state" publish interval
public int staleStateThresholdMs = 300000; // 5 min
/**
* Number of executor threads for ping tasks
* ({@code STATIONPINGERPARAMS_EXECUTORTHREADCOUNT})
*/
public int executorThreadCount = 3;
}
/** StationPinger parameters. */
public StationPingerParams stationPingerParams = new StationPingerParams();
/** Construct RCAConfig from environment variables. */
public static RCAConfig fromEnv(Map<String, String> env) {
RCAConfig config = new RCAConfig();
String v;
// @formatter:off
/* StationPingerParams */
StationPingerParams stationPingerParams = config.stationPingerParams;
if ((v = env.get("STATIONPINGERPARAMS_PINGINTERVALSEC")) != null) {
stationPingerParams.pingIntervalSec = Integer.parseInt(v);
}
if ((v = env.get("STATIONPINGERPARAMS_PINGCOUNT")) != null) {
stationPingerParams.pingCount = Integer.parseInt(v);
}
if ((v = env.get("STATIONPINGERPARAMS_STALESTATETHRESHOLDMS")) != null) {
stationPingerParams.staleStateThresholdMs = Integer.parseInt(v);
}
if ((v = env.get("STATIONPINGERPARAMS_EXECUTORTHREADCOUNT")) != null) {
stationPingerParams.executorThreadCount = Integer.parseInt(v);
}
// @formatter:on
return config;
}
}

View File

@@ -0,0 +1,116 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
package com.facebook.openwifi.rrm.rca;
import com.facebook.openwifi.cloudsdk.UCentralClient;
import com.facebook.openwifi.cloudsdk.UCentralUtils;
import com.facebook.openwifi.cloudsdk.models.gw.CommandInfo;
/**
* Utilities for root cause analysis.
*/
public class RCAUtils {
/** Ping result, only containing a data summary (not individual pings). */
public static class PingResult {
// NOTE: fields are taken directly from ping output
/** Minimum ping RTT (ms) */
public double min;
/** Average ping RTT (ms) */
public double avg;
/** Maximum ping RTT (ms) */
public double max;
/** Standard deviation of ping RTT measurements (ms) */
public double mdev;
// TODO other stats? (ex. tx/rx packets, % packet loss)
@Override
public String toString() {
return String.format("%.3f/%.3f/%.3f/%.3f ms", min, avg, max, mdev);
}
}
/**
* Parse raw ping output, returning null upon error.
*
* This only supports the busybox ping format.
*/
private static PingResult parsePingOutput(String output) {
// Only parse summary line (should be last line in output).
// Code below is optimized for minimal string operations.
//
// Examples of supported formats:
// round-trip min/avg/max = 4.126/42.470/84.081 ms
// rtt min/avg/max/mdev = 16.853/20.114/23.375/3.261 ms
final String SUMMARY_TEXT_3 = "min/avg/max";
int idx = output.lastIndexOf(SUMMARY_TEXT_3);
if (idx != -1) {
idx += SUMMARY_TEXT_3.length();
} else {
final String SUMMARY_TEXT_4 = "min/avg/max/mdev";
idx = output.lastIndexOf(SUMMARY_TEXT_4);
if (idx != -1) {
idx += SUMMARY_TEXT_4.length();
} else {
return null;
}
}
PingResult result = null;
for (; idx < output.length(); idx++) {
if (Character.isDigit(output.charAt(idx))) {
break;
}
}
if (idx < output.length()) {
int endIdx = output.indexOf(' ', idx);
if (endIdx != -1) {
String s = output.substring(idx, endIdx);
String[] tokens = s.split("/");
if (tokens.length == 3) {
result = new PingResult();
result.min = Double.parseDouble(tokens[0]);
result.avg = Double.parseDouble(tokens[1]);
result.max = Double.parseDouble(tokens[2]);
} else if (tokens.length == 4) {
result = new PingResult();
result.min = Double.parseDouble(tokens[0]);
result.avg = Double.parseDouble(tokens[1]);
result.max = Double.parseDouble(tokens[2]);
result.mdev = Double.parseDouble(tokens[3]);
}
}
}
return result;
}
/**
* Instruct a device (AP) to ping a given destination (IP/hostname),
* returning the raw ping output or null upon error.
*
* @param client the UCentralClient instance
* @param serialNumber the device (AP) serial number
* @param host the ping destination
* @param pingCount the number of pings to send
* @return the ping output, or null upon error
*/
public static PingResult pingFromDevice(
UCentralClient client,
String serialNumber,
String host,
int pingCount
) {
if (pingCount < 1) {
throw new IllegalArgumentException("Invalid pingCount < 1");
}
String script = String.format("ping -c %d %s", pingCount, host);
int timeoutSec = pingCount /* time buffer as follows: */ * 2 + 10;
CommandInfo info = client.runScript(serialNumber, script, timeoutSec);
String output = UCentralUtils.getScriptOutput(info);
return output != null ? parsePingOutput(output) : null;
}
}

View File

@@ -0,0 +1,264 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
package com.facebook.openwifi.rrm.rca.modules;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.facebook.openwifi.cloudsdk.UCentralClient;
import com.facebook.openwifi.cloudsdk.UCentralUtils;
import com.facebook.openwifi.cloudsdk.kafka.UCentralKafkaConsumer;
import com.facebook.openwifi.cloudsdk.kafka.UCentralKafkaConsumer.KafkaRecord;
import com.facebook.openwifi.cloudsdk.models.ap.State;
import com.facebook.openwifi.cloudsdk.models.gw.ServiceEvent;
import com.facebook.openwifi.rrm.Utils;
import com.facebook.openwifi.rrm.rca.RCAConfig.StationPingerParams;
import com.facebook.openwifi.rrm.rca.RCAUtils;
import com.facebook.openwifi.rrm.rca.RCAUtils.PingResult;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
/**
* Ping service to measure latency/jitter between Wi-Fi APs and clients
* (stations).
* <p>
* This class subscribes to the Kafka "state" topic to retrieve the list of APs
* with connected clients, then issues ping commands for each (AP, STA) pair at
* a given frequency. All actions are submitted to an executor during Kafka
* callbacks.
*/
public class StationPinger {
private static final Logger logger =
LoggerFactory.getLogger(StationPinger.class);
/** The module parameters. */
private final StationPingerParams params;
/** The uCentral client. */
private final UCentralClient uCentralClient;
/** The executor service instance. */
private final ExecutorService executor;
/** The Gson instance. */
private final Gson gson = new Gson();
/**
* Map from device (serial number) to the latest map of STAs
* (i.e. client MAC address to Client structure).
*/
private Map<String, Map<String, State.Interface.Client>> deviceToClients =
new ConcurrentHashMap<>();
/**
* Map of last ping timestamps, keyed on
* {@link #getDeviceKey(String, String)}.
*/
private Map<String, Long> lastPingTsMap = new ConcurrentHashMap<>();
/** Constructor. */
public StationPinger(
StationPingerParams params,
UCentralClient uCentralClient,
UCentralKafkaConsumer consumer
) {
this.params = params;
this.uCentralClient = uCentralClient;
this.executor =
Executors.newFixedThreadPool(
params.executorThreadCount,
new Utils.NamedThreadFactory(
"RCA_" + this.getClass().getSimpleName()
)
);
if (params.pingIntervalSec < 1) {
logger.info("StationPinger is disabled");
return; // quit before registering listeners
}
// Register Kafka listener
if (consumer != null) {
consumer.addKafkaListener(
getClass().getSimpleName(),
new UCentralKafkaConsumer.KafkaListener() {
@Override
public void handleStateRecords(List<KafkaRecord> records) {
handleKafkaStateRecords(records);
}
@Override
public void handleWifiScanRecords(
List<KafkaRecord> records
) { /* ignored */ }
@Override
public void handleServiceEventRecords(
List<ServiceEvent> serviceEventRecords
) { /* ignored */ }
}
);
}
}
/** Process the list of received State records. */
private void handleKafkaStateRecords(List<KafkaRecord> records) {
long now = System.currentTimeMillis();
for (KafkaRecord record : records) {
// Drop old records
if (now - record.timestampMs > params.staleStateThresholdMs) {
logger.debug(
"Dropping old state record for {} at time {}",
record.serialNumber,
record.timestampMs
);
continue;
}
// Deserialize State
JsonObject state = record.payload.getAsJsonObject("state");
if (state == null) {
continue;
}
try {
State stateModel = gson.fromJson(state, State.class);
Map<String, State.Interface.Client> clientMap =
UCentralUtils.getWifiClientInfo(stateModel);
if (
deviceToClients.put(record.serialNumber, clientMap) == null
) {
// Enqueue this device
final String serialNumber = record.serialNumber;
executor.submit(() -> pingDevices(serialNumber));
}
} catch (JsonSyntaxException e) {
logger.error(
String.format(
"Device %s: failed to deserialize state: %s",
record.serialNumber,
state
),
e
);
}
}
}
/** Shut down all resources. */
public void shutdown() {
executor.shutdownNow();
}
/**
* Issue ping commands to all clients of a given AP.
*
* Note that this is intentionally NOT parallelized to avoid collisions
* while transmitting to/from multiple clients of the same AP.
*/
private void pingDevices(String serialNumber) {
Map<String, State.Interface.Client> clientMap =
deviceToClients.get(serialNumber);
if (clientMap == null) {
return; // shouldn't happen
}
logger.trace(
"{}: Pinging all clients ({} total)...",
serialNumber,
clientMap.size()
);
final long PING_INTERVAL_NS =
Math.max(params.pingIntervalSec, 1) * 1_000_000_000L;
for (
Map.Entry<String, State.Interface.Client> entry : clientMap
.entrySet()
) {
String mac = entry.getKey();
String host = getClientAddress(entry.getValue());
if (host == null) {
logger.debug(
"{}: client {} has no pingable address",
serialNumber,
mac
);
continue;
}
// Check backoff timer
long now = System.nanoTime();
String deviceKey = getDeviceKey(serialNumber, mac);
Long lastPingTs = lastPingTsMap.putIfAbsent(deviceKey, now);
if (lastPingTs != null && now - lastPingTs < PING_INTERVAL_NS) {
logger.trace(
"{}: Skipping ping for {} (last pinged {}s ago)",
serialNumber,
mac,
(now - lastPingTs) / 1_000_000_000L
);
continue;
}
lastPingTsMap.put(deviceKey, now);
// Issue ping command
logger.debug(
"{}: Pinging client {} ({})",
serialNumber,
mac,
host
);
PingResult result = RCAUtils
.pingFromDevice(uCentralClient, serialNumber, host, params.pingCount);
if (result == null) {
logger.debug(
"Ping failed from {} to {} ({})",
serialNumber,
mac,
host
);
continue;
}
// TODO handle results
logger.info(
"Ping result from {} to {} ({}): {}",
serialNumber,
mac,
host,
result.toString()
);
}
// Remove map entries after we process them
deviceToClients.remove(serialNumber);
}
/** Return an address to ping for the given client. */
private String getClientAddress(State.Interface.Client client) {
if (client.ipv4_addresses.length > 0) {
return client.ipv4_addresses[0];
} else if (client.ipv6_addresses.length > 0) {
return client.ipv6_addresses[0];
} else {
return null;
}
}
/** Return a key to use in {@link #lastPingTsMap}. */
private String getDeviceKey(String serialNumber, String sta) {
// Use (AP, STA) pair as the key to handle STAs moving between APs
// TODO - do we care about radio/band/channel changes too?
return serialNumber + '\0' + sta;
}
}