Topo-map: Implement realtime (leave/join) client events handling

- Timestamp for events is calculated upon socket read,
   which is the most accurate TS we can have;
 - topo map uses the TS in calculations to deduce the
   'latest' most-actual event while constructing map;
 - Origin of edges and nodes is preserved, which means
   topomap would not delete any nodes missing from state
   message, in case if this node is also a directly
   connected uCentral device instance;
 - Disconnected leaf-nodes are automatically removed,
   in case if their origin's not uCentral direct connection
   (info about node received from state/rt evts etc)

Signed-off-by: Oleksandr Mazur <oleksandr.mazur@plvision.eu>
This commit is contained in:
Oleksandr Mazur
2024-05-17 16:30:21 +03:00
parent 79fd5be64e
commit cee366eb06
8 changed files with 1331 additions and 191 deletions

View File

@@ -36,6 +36,7 @@ base64 = "0.22.0"
rustls-pemfile = "2.1.2"
rustls-pki-types = "1.7.0"
x509-parser = "0.16.0"
chrono = "0.4.38"
[build-dependencies]
tonic-build = "0.10"

View File

@@ -9,9 +9,10 @@ use crate::{
cgw_ucentral_event_parse, cgw_ucentral_parse_connect_event, CGWUCentralCommandType,
CGWUCentralEventType,
},
cgw_ucentral_topology_map::CGWUcentralTopologyMap,
cgw_ucentral_topology_map::CGWUCentralTopologyMap,
};
use chrono::offset::Local;
use eui48::MacAddress;
use futures_util::{
stream::{SplitSink, SplitStream},
@@ -225,15 +226,21 @@ impl CGWConnectionProcessor {
fsm_state: &mut CGWUCentralMessageProcessorState,
pending_req_id: u64,
) -> Result<CGWConnectionState, &'static str> {
// Make sure we always track the as accurate as possible the time
// of receiving of the event (where needed).
let timestamp = Local::now();
match msg {
Ok(msg) => match msg {
Close(_t) => {
return Ok(CGWConnectionState::ClosedGracefully);
}
Text(payload) => {
if let Ok(evt) = cgw_ucentral_event_parse(&device_type, &payload) {
if let Ok(evt) =
cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp())
{
if let CGWUCentralEventType::State(_) = evt.evt_type {
let topo_map = CGWUcentralTopologyMap::get_ref();
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.process_state_message(&device_type, evt).await;
topo_map.debug_dump_map().await;
} else if let CGWUCentralEventType::Reply(content) = evt.evt_type {
@@ -241,6 +248,12 @@ impl CGWConnectionProcessor {
assert_eq!(content.id, pending_req_id);
*fsm_state = CGWUCentralMessageProcessorState::Idle;
debug!("Got reply event for pending request id: {}", pending_req_id);
} else if let CGWUCentralEventType::RealtimeEvent(_) = evt.evt_type {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map
.process_device_topology_event(&device_type, evt)
.await;
topo_map.debug_dump_map().await;
}
}

View File

@@ -7,7 +7,7 @@ use crate::cgw_ucentral_messages_queue_manager::{
};
use crate::cgw_ucentral_parser::{cgw_ucentral_parse_command_message, CGWUCentralCommand};
use crate::cgw_ucentral_parser::{CGWDeviceChange, CGWDeviceChangedData, CGWToNBMessageType};
use crate::cgw_ucentral_topology_map::CGWUcentralTopologyMap;
use crate::cgw_ucentral_topology_map::CGWUCentralTopologyMap;
use crate::AppArgs;
use crate::{
@@ -1025,7 +1025,7 @@ impl CGWConnectionServer {
);
}
let topo_map = CGWUcentralTopologyMap::get_ref();
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.insert_device(&device_mac).await;
topo_map.debug_dump_map().await;
@@ -1061,7 +1061,7 @@ impl CGWConnectionServer {
devices_cache.dump_devices_cache();
}
let topo_map = CGWUcentralTopologyMap::get_ref();
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.remove_device(&device_mac).await;
topo_map.debug_dump_map().await;

View File

@@ -1,49 +1,681 @@
use base64::prelude::*;
use eui48::MacAddress;
use flate2::read::ZlibDecoder;
use serde_json::Value;
use serde_json::{Map, Value};
use std::io::prelude::*;
use std::str::FromStr;
use std::{collections::HashMap, str::FromStr};
use crate::cgw_ucentral_parser::{
CGWUCentralEvent, CGWUCentralEventConnect, CGWUCentralEventConnectParamsCaps,
CGWUCentralEventLog, CGWUCentralEventReply, CGWUCentralEventState,
CGWUCentralEventStateLLDPData, CGWUCentralEventStateLLDPDataLinks, CGWUCentralEventType,
CGWUcentralJRPCMessage,
CGWUCentralEventLog, CGWUCentralEventRealtimeEvent, CGWUCentralEventRealtimeEventType,
CGWUCentralEventRealtimeEventWClientJoin, CGWUCentralEventRealtimeEventWClientLeave,
CGWUCentralEventReply, CGWUCentralEventState, CGWUCentralEventStateClients,
CGWUCentralEventStateClientsData, CGWUCentralEventStateClientsType,
CGWUCentralEventStateLLDPData, CGWUCentralEventStateLinks, CGWUCentralEventType,
CGWUCentralJRPCMessage,
};
fn parse_lldp_data(data: &Value) -> Vec<CGWUCentralEventStateLLDPDataLinks> {
let mut links: Vec<CGWUCentralEventStateLLDPDataLinks> = Vec::new();
fn parse_lldp_data(lldp_peers: &Map<String, Value>, links: &mut Vec<CGWUCentralEventStateLinks>) {
let directions = [
(lldp_peers["upstream"].as_object().unwrap(), false),
(lldp_peers["downstream"].as_object().unwrap(), true),
];
if let Value::Object(map) = data {
let directions = [
(map["upstream"].as_object().unwrap(), false),
(map["downstream"].as_object().unwrap(), true),
];
for (d, is_downstream) in directions {
for (key, value) in d {
let data = value.as_array().unwrap()[0].as_object().unwrap();
let local_port = key.to_string().replace("WAN", "eth0").replace("LAN", "eth");
let remote_serial = MacAddress::from_str(data["mac"].as_str().unwrap()).unwrap();
let remote_port = data["port_id"].as_str().unwrap().to_string();
for (d, is_downstream) in directions {
for (key, value) in d {
let data = value.as_array().unwrap()[0].as_object().unwrap();
links.push(CGWUCentralEventStateLinks {
local_port,
remote_serial,
remote_port,
is_downstream,
});
}
}
}
let local_port = key.to_string().replace("WAN", "eth0").replace("LAN", "eth");
let remote_mac = MacAddress::from_str(data["mac"].as_str().unwrap()).unwrap();
let remote_port = data["port_id"].as_str().unwrap().to_string();
fn parse_wireless_ssids_info(
ssids: &Vec<Value>,
ssids_map: &mut HashMap<String, (String, String)>,
) {
for s in ssids {
if let Value::Object(ssid) = s {
if !ssid.contains_key("band")
|| !ssid.contains_key("bssid")
|| !ssid.contains_key("ssid")
{
continue;
}
links.push(CGWUCentralEventStateLLDPDataLinks {
local_port,
remote_mac,
remote_port,
is_downstream,
});
let band_value = {
if let Value::String(band_str) = &ssid["band"] {
band_str.clone()
} else {
continue;
}
};
let bssid_value = {
if let Value::String(bssid_str) = &ssid["bssid"] {
bssid_str.clone()
} else {
continue;
}
};
let ssid_value = {
if let Value::String(ssid_str) = &ssid["ssid"] {
ssid_str.clone()
} else {
continue;
}
};
ssids_map.insert(bssid_value, (ssid_value, band_value));
}
}
}
fn parse_wireless_clients_data(
ssids: &Vec<Value>,
links: &mut Vec<CGWUCentralEventStateClients>,
upstream_ifaces: &Vec<String>,
ssids_map: &HashMap<String, (String, String)>,
timestamp: i64,
) {
for s in ssids {
if let Value::Object(ssid) = s {
let local_port = {
if let Value::String(port) = &ssid["iface"] {
port.clone()
} else {
warn!("Failed to retrieve local_port for {:?}", ssid);
continue;
}
};
// Upstream WAN iface? Not supported
if upstream_ifaces.iter().any(|i| *i == local_port) {
warn!(
"Skipped ssid wireless client info {:?} for upstream interface",
ssid
);
continue;
}
if !ssid.contains_key("associations") {
warn!("Failed to retrieve associations for {local_port}");
continue;
}
if let Value::Array(associations) = &ssid["associations"] {
for association in associations {
let mut ts = 0i64;
if let Value::Object(map) = association {
if !map.contains_key("station") || !map.contains_key("connected") {
continue;
}
let bssid_value = {
if let Value::String(bssid) = &map["bssid"] {
bssid.clone()
} else {
continue;
}
};
let (ssid, band) = {
if let Some(v) = ssids_map.get(&bssid_value) {
(v.0.clone(), v.1.clone())
} else {
warn!("Failed to get ssid/band value for {bssid_value}");
continue;
}
};
let remote_serial =
MacAddress::from_str(map["station"].as_str().unwrap()).unwrap();
// Time, for how long this connection's been associated
// with the AP that reports this data.
if let Value::Number(t) = &map["connected"] {
ts = t.as_i64().unwrap();
}
links.push(CGWUCentralEventStateClients {
client_type: CGWUCentralEventStateClientsType::Wireless(
// Track timestamp of initial connection:
// if we receive state evt <now>, substract
// connected since from it, to get
// original connection timestamp.
timestamp - ts,
ssid,
band,
),
local_port: local_port.clone(),
remote_serial,
// TODO: rework remote_port to have Band, RSSI, chan etc
// for an edge.
remote_port: "<Wireless-client>".to_string(),
is_downstream: true,
});
}
}
}
}
}
links
}
pub fn cgw_ucentral_ap_parse_message(message: &String) -> Result<CGWUCentralEvent, &'static str> {
let map: CGWUcentralJRPCMessage = match serde_json::from_str(message) {
fn parse_wired_clients_data(
clients: &Vec<Value>,
links: &mut Vec<CGWUCentralEventStateClients>,
upstream_ifaces: &Vec<String>,
timestamp: i64,
) {
for client in clients {
let local_port = {
if let Value::Array(arr) = &client["ports"] {
match arr[0].as_str() {
Some(s) => s.to_string(),
None => {
warn!(
"Failed to get clients port string for {:?}, skipping",
client
);
continue;
}
}
} else {
warn!("Failed to parse clients port for {:?}, skipping", client);
continue;
}
};
// Skip wireless clients info
if local_port.contains("wlan") || local_port.contains("WLAN") {
continue;
}
// TODO: W/A for now: ignore any upstream-reported clients,
// because it includes ARP neighbours and clients.
// The logic to process uplink neighbors properly should be
// much more complicated and should be treated as a
// separate case.
// This logic also overlaps the uplink switch's neighbor
// detection logic (LLDP, ARP, FDB etc), and building topo map
// based purely on AP's input could result in invalid
// map formation.
if upstream_ifaces.iter().any(|i| *i == local_port) {
continue;
}
let remote_serial = MacAddress::from_str(client["mac"].as_str().unwrap()).unwrap();
links.push(CGWUCentralEventStateClients {
// Wired clients don't have <connected since> data.
// Treat <now> as latest connected ts.
client_type: CGWUCentralEventStateClientsType::Wired(timestamp),
local_port,
remote_serial,
// TODO: rework remote_port to have speed / duplex characteristics
// for an edge.
remote_port: "<Wired-client>".to_string(),
is_downstream: true,
});
}
}
fn parse_interface_data(
interface: &Map<String, Value>,
links: &mut Vec<CGWUCentralEventStateClients>,
upstream_ifaces: &Vec<String>,
timestamp: i64,
) {
if interface.contains_key("clients") {
if let Value::Array(clients) = &interface["clients"] {
parse_wired_clients_data(&clients, links, upstream_ifaces, timestamp);
}
}
if interface.contains_key("ssids") {
let mut ssids_map: HashMap<String, (String, String)> = HashMap::new();
if let Value::Array(ssids) = &interface["ssids"] {
parse_wireless_ssids_info(ssids, &mut ssids_map);
parse_wireless_clients_data(&ssids, links, upstream_ifaces, &ssids_map, timestamp);
}
}
}
fn parse_link_state_data(
link_state: &Map<String, Value>,
upstream_ifaces: &mut Vec<String>,
downstream_ifaces: &mut Vec<String>,
) {
if let Value::Object(upstream_obj) = &link_state["upstream"] {
for (k, _v) in upstream_obj.iter() {
upstream_ifaces.push(k.to_string());
}
}
if let Value::Object(downstream_obj) = &link_state["downstream"] {
for (k, _v) in downstream_obj.iter() {
downstream_ifaces.push(k.to_string());
}
}
}
fn parse_state_event_data(
map: CGWUCentralJRPCMessage,
timestamp: i64,
) -> Result<CGWUCentralEvent, &'static str> {
if !map.contains_key("params") {
return Err("Invalid state event received: params is missing");
}
let params = &map["params"];
if let Value::String(compressed_data) = &params["compress_64"] {
let decoded_data = match BASE64_STANDARD.decode(compressed_data) {
Ok(d) => d,
Err(e) => {
warn!("Failed to decode base64+zip state evt {e}");
return Err("Failed to decode base64+zip state evt");
}
};
let mut d = ZlibDecoder::new(&decoded_data[..]);
let mut unzipped_data = String::new();
if let Err(e) = d.read_to_string(&mut unzipped_data) {
warn!("Failed to decompress decrypted state message {e}");
return Err("Failed to decompress decrypted state message");
}
let state_map: CGWUCentralJRPCMessage = match serde_json::from_str(&unzipped_data) {
Ok(m) => m,
Err(e) => {
error!("Failed to parse input state message {e}");
return Err("Failed to parse input state message");
}
};
let serial = MacAddress::from_str(state_map["serial"].as_str().unwrap()).unwrap();
if let Value::Object(state_map) = &state_map["state"] {
let mut lldp_links: Vec<CGWUCentralEventStateLinks> = Vec::new();
let mut clients_links: Vec<CGWUCentralEventStateClients> = Vec::new();
if state_map.contains_key("lldp-peers") {
if let Value::Object(v) = &state_map["lldp-peers"] {
parse_lldp_data(&v, &mut lldp_links);
}
}
let mut upstream_ifaces: Vec<String> = Vec::new();
let mut downstream_ifaces: Vec<String> = Vec::new();
if state_map.contains_key("link-state") {
if let Value::Object(obj) = &state_map["link-state"] {
parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces);
}
}
if let Value::Array(arr) = &state_map["interfaces"] {
for interface in arr {
if let Value::Object(iface) = interface {
parse_interface_data(
&iface,
&mut clients_links,
&mut upstream_ifaces,
timestamp,
);
}
}
}
let state_event = CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::State(CGWUCentralEventState {
timestamp,
local_mac: serial.clone(),
lldp_data: CGWUCentralEventStateLLDPData { links: lldp_links },
clients_data: CGWUCentralEventStateClientsData {
links: clients_links,
},
}),
};
return Ok(state_event);
}
return Err("Parsed, decompressed state message but failed to find state object");
} else if let Value::Object(state_map) = &params["state"] {
let serial = MacAddress::from_str(state_map["serial"].as_str().unwrap()).unwrap();
let mut lldp_links: Vec<CGWUCentralEventStateLinks> = Vec::new();
let mut clients_links: Vec<CGWUCentralEventStateClients> = Vec::new();
if state_map.contains_key("lldp-peers") {
if let Value::Object(v) = &state_map["lldp-peers"] {
parse_lldp_data(&v, &mut lldp_links);
}
}
let mut upstream_ifaces: Vec<String> = Vec::new();
let mut downstream_ifaces: Vec<String> = Vec::new();
if state_map.contains_key("link-state") {
if let Value::Object(obj) = &state_map["link-state"] {
parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces);
}
}
if let Value::Array(arr) = &state_map["interfaces"] {
for interface in arr {
if let Value::Object(iface) = interface {
parse_interface_data(
&iface,
&mut clients_links,
&mut upstream_ifaces,
timestamp,
);
}
}
}
let state_event = CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::State(CGWUCentralEventState {
timestamp,
local_mac: serial,
lldp_data: CGWUCentralEventStateLLDPData { links: lldp_links },
clients_data: CGWUCentralEventStateClientsData {
links: clients_links,
},
}),
};
return Ok(state_event);
}
return Err("Failed to parse state event");
}
fn parse_realtime_event_data(
map: CGWUCentralJRPCMessage,
timestamp: i64,
) -> Result<CGWUCentralEvent, &'static str> {
if !map.contains_key("params") {
return Err("Invalid event received: params is missing");
}
let params = &map["params"];
let serial = {
match params["serial"].as_str() {
Some(v) => match MacAddress::from_str(v) {
Ok(serial) => serial,
Err(_) => {
return Err("Invalid event received: serial is an invalid MAC address");
}
},
None => {
return Err("Invalid event received: serial is missing");
}
}
};
let events = match &params["data"]["event"] {
Value::Array(events) => events,
_ => {
return Err("Invalid event received: data:event missing");
}
};
if events.len() < 2 {
warn!("Received malformed event: number of event values < 2");
return Err("Received malformed event: number of event values < 2");
}
// We don't actually care about this TS, but it's a format-abiding requirement
// put onto the AP's (we expect ts to be there).
match &events[0] {
Value::Number(ts) => {
if let None = ts.as_i64() {
warn!("Received malformed event: missing timestamp");
return Err("Received malformed event: missing timestamp");
}
}
_ => {
warn!("Received malformed event: missing timestamp");
return Err("Received malformed event: missing timestamp");
}
};
let event_data = match &events[1] {
Value::Object(v) => v,
_ => {
warn!("Received malformed event: missing timestamp");
return Err("Received malformed event: missing timestamp");
}
};
if !event_data.contains_key("type") {
warn!("Received malformed event: missing type");
return Err("Received malformed event: missing type");
}
let evt_type = match &event_data["type"] {
Value::String(t) => t,
_ => {
warn!("Received malformed event: type is of wrongful underlying format/type");
return Err("Received malformed event: type is of wrongful underlying format/type");
}
};
let evt_payload = match &event_data["payload"] {
Value::Object(d) => d,
_ => {
warn!("Received malformed event: payload is of wrongful underlying format/type");
return Err("Received malformed event: payload is of wrongful underlying format/type");
}
};
match evt_type.as_str() {
"client.join" => {
if !evt_payload.contains_key("band")
|| !evt_payload.contains_key("client")
|| !evt_payload.contains_key("ssid")
|| !evt_payload.contains_key("rssi")
|| !evt_payload.contains_key("channel")
{
warn!("Received malformed client.join event: band, rssi, ssid, channel and client are required");
return Err("Received malformed client.join event: band, rssi, ssid, channel and client are required");
}
let band = {
match &evt_payload["band"] {
Value::String(s) => s,
_ => {
warn!("Received malformed client.join event: band is of wrongful underlying format/type");
return Err(
"Received malformed client.join event: band is of wrongful underlying format/type",
);
}
}
};
let client = {
match &evt_payload["client"] {
Value::String(s) => match MacAddress::from_str(s.as_str()) {
Ok(v) => v,
Err(_) => {
warn!("Received malformed client.join event: client is a malformed MAC address");
return Err(
"Received malformed client.join event: client is a malformed MAC address",
);
}
},
_ => {
warn!("Received malformed client.join event: client is of wrongful underlying format/type");
return Err(
"Received malformed client.join event: client is of wrongful underlying format/type",
);
}
}
};
let ssid = {
match &evt_payload["ssid"] {
Value::String(s) => s,
_ => {
warn!("Received malformed client.join event: ssid is of wrongful underlying format/type");
return Err(
"Received malformed client.join event: ssid is of wrongful underlying format/type",
);
}
}
};
let rssi = {
match &evt_payload["rssi"] {
Value::Number(s) => match s.as_i64() {
Some(v) => v,
None => {
warn!("Received malformed client.join event: rssi is NaN?");
return Err("Received malformed client.join event: rssi is NaN?");
}
},
_ => {
warn!("Received malformed client.join event: rssi is of wrongful underlying format/type");
return Err(
"Received malformed client.join event: rssi is of wrongful underlying format/type",
);
}
}
};
let channel = {
match &evt_payload["channel"] {
Value::Number(s) => match s.as_u64() {
Some(v) => v,
None => {
warn!("Received malformed client.join event: channel is NaN?");
return Err("Received malformed client.join event: channel is NaN?");
}
},
_ => {
warn!("Received malformed client.join event: channel is of wrongful underlying format/type");
return Err(
"Received malformed client.join event: channel is of wrongful underlying format/type",
);
}
}
};
return Ok(CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::RealtimeEvent(CGWUCentralEventRealtimeEvent {
// For client.join the timestamp is <now> (whenever event's
// been received)
timestamp,
evt_type: CGWUCentralEventRealtimeEventType::WirelessClientJoin(
CGWUCentralEventRealtimeEventWClientJoin {
client,
band: band.to_string(),
ssid: ssid.to_string(),
rssi,
channel,
},
),
}),
});
}
"client.leave" => {
if !evt_payload.contains_key("band")
|| !evt_payload.contains_key("client")
|| !evt_payload.contains_key("connected_time")
{
warn!("Received malformed client.leave event: client, band and connected_time is required");
return Err("Received malformed client.leave event: client, band and connected_time is required");
}
let band = {
match &evt_payload["band"] {
Value::String(s) => s,
_ => {
warn!("Received malformed client.leave event: band is of wrongful underlying format/type");
return Err(
"Received malformed client.leave event: band is of wrongful underlying format/type",
);
}
}
};
let client = {
match &evt_payload["client"] {
Value::String(s) => match MacAddress::from_str(s.as_str()) {
Ok(v) => v,
Err(_) => {
warn!("Received malformed client.leave event: client is a malformed MAC address");
return Err(
"Received malformed client.leave event: client is a malformed MAC address",
);
}
},
_ => {
warn!("Received malformed client.leave event: client is of wrongful underlying format/type");
return Err(
"Received malformed client.leave event: client is of wrongful underlying format/type",
);
}
}
};
let connected_time = {
match &evt_payload["connected_time"] {
Value::Number(s) => match s.as_i64() {
Some(v) => v,
None => {
warn!("Received malformed client.leave event: connected_time is NaN?");
return Err(
"Received malformed client.leave event: connected_time is NaN?",
);
}
},
_ => {
warn!("Received malformed client.leave event: connected_time is of wrongful underlying format/type");
return Err(
"Received malformed client.leave event: connected_time is of wrongful underlying format/type",
);
}
}
};
return Ok(CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::RealtimeEvent(CGWUCentralEventRealtimeEvent {
// For client.leave the timestamp is: <now> substracted with
// the time device's been connected to the AP,
// which translates into:
// timestamp is equal to whenever connection's been
// established originally.
// And in case if <newer> client.join/state message
// is already registered within the CGW,
// this event will be simply dropped.
timestamp: timestamp - connected_time,
evt_type: CGWUCentralEventRealtimeEventType::WirelessClientLeave(
CGWUCentralEventRealtimeEventWClientLeave {
client,
band: band.to_string(),
},
),
}),
});
}
_ => {
warn!("Received unknown event: {evt_type}");
return Err("Received unknown event");
}
}
}
pub fn cgw_ucentral_ap_parse_message(
message: &String,
timestamp: i64,
) -> Result<CGWUCentralEvent, &'static str> {
let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) {
Ok(m) => m,
Err(e) => {
error!("Failed to parse input json {e}");
@@ -96,63 +728,9 @@ pub fn cgw_ucentral_ap_parse_message(message: &String) -> Result<CGWUCentralEven
return Ok(connect_event);
} else if method == "state" {
let params = map.get("params").expect("Params are missing");
if let Value::String(compressed_data) = &params["compress_64"] {
let decoded_data = match BASE64_STANDARD.decode(compressed_data) {
Ok(d) => d,
Err(e) => {
warn!("Failed to decode base64+zip state evt {e}");
return Err("Failed to decode base64+zip state evt");
}
};
let mut d = ZlibDecoder::new(&decoded_data[..]);
let mut unzipped_data = String::new();
if let Err(e) = d.read_to_string(&mut unzipped_data) {
warn!("Failed to decompress decrypted state message {e}");
return Err("Failed to decompress decrypted state message");
}
let state_map: CGWUcentralJRPCMessage = match serde_json::from_str(&unzipped_data) {
Ok(m) => m,
Err(e) => {
error!("Failed to parse input state message {e}");
return Err("Failed to parse input state message");
}
};
let serial = MacAddress::from_str(state_map["serial"].as_str().unwrap()).unwrap();
if let Value::Object(state_map) = &state_map["state"] {
let state_event = CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::State(CGWUCentralEventState {
lldp_data: CGWUCentralEventStateLLDPData {
local_mac: serial,
links: parse_lldp_data(&state_map["lldp-peers"]),
},
}),
};
return Ok(state_event);
}
return Err("Parsed, decompressed state message but failed to find state object");
} else if let Value::Object(state_map) = &params["state"] {
let serial = MacAddress::from_str(state_map["serial"].as_str().unwrap()).unwrap();
let state_event = CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::State(CGWUCentralEventState {
lldp_data: CGWUCentralEventStateLLDPData {
local_mac: serial,
links: parse_lldp_data(&state_map["lldp-peers"]),
},
}),
};
return Ok(state_event);
}
return parse_state_event_data(map, timestamp);
} else if method == "event" {
return parse_realtime_event_data(map, timestamp);
}
} else if map.contains_key("result") {
if !map.contains_key("id") {

View File

@@ -11,7 +11,7 @@ use crate::{
cgw_ucentral_switch_parser::cgw_ucentral_switch_parse_message,
};
pub type CGWUcentralJRPCMessage = Map<String, Value>;
pub type CGWUCentralJRPCMessage = Map<String, Value>;
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventLog {
@@ -37,29 +37,50 @@ pub struct CGWUCentralEventConnect {
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventStateLLDPDataLinks {
pub struct CGWUCentralEventStateLinks {
pub local_port: String,
#[serde(skip)]
pub remote_mac: MacAddress,
pub remote_serial: MacAddress,
pub remote_port: String,
pub is_downstream: bool,
}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub enum CGWUCentralEventStateClientsType {
Wired(i64),
// Ssid, Band
Wireless(i64, String, String),
}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventStateClients {
pub client_type: CGWUCentralEventStateClientsType,
pub local_port: String,
#[serde(skip)]
pub remote_serial: MacAddress,
pub remote_port: String,
pub is_downstream: bool,
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventStateLLDPData {
// Parsed State LLDP data:
// mac address of the device reporting the LLDP data
#[serde(skip)]
pub local_mac: MacAddress,
// links reported by the device:
// local port, remote mac, remote port
pub links: Vec<CGWUCentralEventStateLLDPDataLinks>,
pub links: Vec<CGWUCentralEventStateLinks>,
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventStateClientsData {
// links reported by the device (wired and wireless):
pub links: Vec<CGWUCentralEventStateClients>,
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventState {
// mac address of the device reporting the state evt
pub local_mac: MacAddress,
pub timestamp: i64,
pub lldp_data: CGWUCentralEventStateLLDPData,
pub clients_data: CGWUCentralEventStateClientsData,
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
@@ -67,6 +88,35 @@ pub struct CGWUCentralEventReply {
pub id: u64,
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventRealtimeEventWClientJoin {
pub client: MacAddress,
pub band: String,
pub ssid: String,
pub rssi: i64,
pub channel: u64,
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventRealtimeEventWClientLeave {
pub client: MacAddress,
pub band: String,
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub enum CGWUCentralEventRealtimeEventType {
WirelessClientJoin(CGWUCentralEventRealtimeEventWClientJoin),
WirelessClientLeave(CGWUCentralEventRealtimeEventWClientLeave),
#[default]
None,
}
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct CGWUCentralEventRealtimeEvent {
pub evt_type: CGWUCentralEventRealtimeEventType,
pub timestamp: i64,
}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub enum CGWUCentralEventType {
Connect(CGWUCentralEventConnect),
@@ -83,6 +133,7 @@ pub enum CGWUCentralEventType {
Ping,
Recovery,
VenueBroadcast,
RealtimeEvent(CGWUCentralEventRealtimeEvent),
Reply(CGWUCentralEventReply),
}
@@ -180,7 +231,7 @@ pub fn cgw_ucentral_parse_connect_event(
return Err("Message to string cast failed");
};
let map: CGWUcentralJRPCMessage = match serde_json::from_str(&msg) {
let map: CGWUCentralJRPCMessage = match serde_json::from_str(&msg) {
Ok(m) => m,
Err(e) => {
error!("Failed to parse input json {e}");
@@ -230,7 +281,7 @@ pub fn cgw_ucentral_parse_connect_event(
pub fn cgw_ucentral_parse_command_message(
message: &String,
) -> Result<CGWUCentralCommand, &'static str> {
let map: CGWUcentralJRPCMessage = match serde_json::from_str(message) {
let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) {
Ok(m) => m,
Err(e) => {
error!("Failed to parse input json {e}");
@@ -282,9 +333,10 @@ pub fn cgw_ucentral_parse_command_message(
pub fn cgw_ucentral_event_parse(
device_type: &CGWDeviceType,
message: &String,
timestamp: i64,
) -> Result<CGWUCentralEvent, &'static str> {
match device_type {
CGWDeviceType::CGWDeviceAP => cgw_ucentral_ap_parse_message(&message),
CGWDeviceType::CGWDeviceSwitch => cgw_ucentral_switch_parse_message(&message),
CGWDeviceType::CGWDeviceAP => cgw_ucentral_ap_parse_message(&message, timestamp),
CGWDeviceType::CGWDeviceSwitch => cgw_ucentral_switch_parse_message(&message, timestamp),
}
}

View File

@@ -3,15 +3,13 @@ use serde_json::Value;
use std::str::FromStr;
use crate::cgw_ucentral_parser::{
CGWUCentralEvent, CGWUCentralEventLog, CGWUCentralEventState, CGWUCentralEventStateLLDPData,
CGWUCentralEventStateLLDPDataLinks, CGWUCentralEventType, CGWUcentralJRPCMessage,
CGWUCentralEvent, CGWUCentralEventLog, CGWUCentralEventState, CGWUCentralEventStateClientsData,
CGWUCentralEventStateLLDPData, CGWUCentralEventStateLinks, CGWUCentralEventType,
CGWUCentralJRPCMessage,
};
fn parse_lldp_data(
data: &Value,
upstream_port: Option<String>,
) -> Vec<CGWUCentralEventStateLLDPDataLinks> {
let mut links: Vec<CGWUCentralEventStateLLDPDataLinks> = Vec::new();
fn parse_lldp_data(data: &Value, upstream_port: Option<String>) -> Vec<CGWUCentralEventStateLinks> {
let mut links: Vec<CGWUCentralEventStateLinks> = Vec::new();
if let Value::Object(map) = data {
let directions = [
@@ -24,7 +22,7 @@ fn parse_lldp_data(
let data = value.as_array().unwrap()[0].as_object().unwrap();
let local_port = key.to_string();
let remote_mac = MacAddress::from_str(data["mac"].as_str().unwrap()).unwrap();
let remote_serial = MacAddress::from_str(data["mac"].as_str().unwrap()).unwrap();
let remote_port = data["port"].as_str().unwrap().to_string();
let is_downstream: bool = {
if let Some(ref port) = upstream_port {
@@ -34,9 +32,9 @@ fn parse_lldp_data(
}
};
links.push(CGWUCentralEventStateLLDPDataLinks {
links.push(CGWUCentralEventStateLinks {
local_port,
remote_mac,
remote_serial,
remote_port,
is_downstream,
});
@@ -49,8 +47,9 @@ fn parse_lldp_data(
pub fn cgw_ucentral_switch_parse_message(
message: &String,
timestamp: i64,
) -> Result<CGWUCentralEvent, &'static str> {
let map: CGWUcentralJRPCMessage = match serde_json::from_str(message) {
let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) {
Ok(m) => m,
Err(e) => {
error!("Failed to parse input json {e}");
@@ -90,10 +89,12 @@ pub fn cgw_ucentral_switch_parse_message(
if let Value::Object(state_map) = &params["state"] {
let serial = MacAddress::from_str(params["serial"].as_str().unwrap()).unwrap();
let mut upstream_port: Option<String> = None;
if let Value::Array(default_gw) = &state_map["default-gateway"] {
if let Some(gw) = default_gw.get(0) {
if let Value::String(port) = &gw["out-port"] {
upstream_port = Some(port.as_str().to_string());
if state_map.contains_key("default-gateway") {
if let Value::Array(default_gw) = &state_map["default-gateway"] {
if let Some(gw) = default_gw.get(0) {
if let Value::String(port) = &gw["out-port"] {
upstream_port = Some(port.as_str().to_string());
}
}
}
}
@@ -101,10 +102,12 @@ pub fn cgw_ucentral_switch_parse_message(
let state_event = CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::State(CGWUCentralEventState {
timestamp,
local_mac: serial,
lldp_data: CGWUCentralEventStateLLDPData {
local_mac: serial,
links: parse_lldp_data(&state_map["lldp-peers"], upstream_port),
},
clients_data: CGWUCentralEventStateClientsData { links: Vec::new() },
}),
};

View File

@@ -1,6 +1,9 @@
use crate::{
cgw_device::CGWDeviceType,
cgw_ucentral_parser::{CGWUCentralEvent, CGWUCentralEventType},
cgw_ucentral_parser::{
CGWUCentralEvent, CGWUCentralEventRealtimeEventType, CGWUCentralEventStateClientsType,
CGWUCentralEventType,
},
};
use petgraph::dot::{Config, Dot};
use petgraph::{
@@ -15,34 +18,83 @@ use tokio::sync::RwLock;
use eui48::MacAddress;
type WirelessClientBand = String;
type WirelessClientSsid = String;
// One 'slice' / part of edge (Mac + port);
// To make a proper complete edge two parts needed:
// SRC -> DST
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct CGWUcentralTopologySubEdge {
pub mac: MacAddress,
pub port: String,
pub enum CGWUCentralTopologySubEdgePort {
// Used in <SRC> subedge
PhysicalWiredPort(String),
WirelessPort,
// Used in <DST> subedge
// Wired client reported by AP (no dst port info available)
// TODO: Duplex speed?
WiredClient,
// Wieless client reported by AP: SSID + Band
WirelessClient(WirelessClientSsid, WirelessClientBand),
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct CGWUCentralTopologySubEdge {
pub serial: MacAddress,
pub port: CGWUCentralTopologySubEdgePort,
}
// Complete edge consisting of SRC -> DST 'sub-edges'
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct CGWUcentralTopologyEdge(CGWUcentralTopologySubEdge, CGWUcentralTopologySubEdge);
pub struct CGWUCentralTopologyEdge(CGWUCentralTopologySubEdge, CGWUCentralTopologySubEdge);
#[derive(Debug)]
struct CGWUcentralTopologyMapData {
node_idx_map: HashMap<MacAddress, NodeIndex>,
edge_idx_map: HashMap<CGWUcentralTopologyEdge, EdgeIndex>,
graph: StableGraph<String, String>,
type EdgeCreationTimestamp = i64;
// We have to track the 'origin' of any node we add to topo map,
// because deletion decision should be made on the following basis:
// - direct WSS connection should be always kept in the topo map,
// and only erased when disconnect happens;
// - any nodes, that are added to topo map as 'clients' (lldp peers,
// wired and wireless clients, fdb info) should be deleted when the node
// that reported them gets deleted;
// however if 'client' node also exists in topo map, and is currently connected
// to CGW (WSS), then it should be left untouched;
#[derive(Debug, Clone)]
enum CGWUCentralTopologyMapNodeOrigin {
UCentralDevice,
StateLLDPPeer,
StateWiredWireless,
StateFDB,
}
// We have to track the 'origin' of any edge we add to topo map,
// because deletion decision should be made on the following basis:
// - 'client.leave' should remove edge (only if join timestamp < leave timestamp);
// - 'client.join' should remove edge and create (potentially with new SRC node)
// a new edge with device/node that reports this event.
// Only, if join timestamp > <previous> join timestamp (state evt, realtime join evt)
#[derive(Debug, Clone)]
enum CGWUCentralTopologyMapEdgeOrigin {
StateLLDPPeer,
StateWiredWireless(EdgeCreationTimestamp),
StateFDB,
}
#[derive(Debug)]
pub struct CGWUcentralTopologyMap {
data: Arc<RwLock<CGWUcentralTopologyMapData>>,
struct CGWUCentralTopologyMapData {
node_idx_map: HashMap<MacAddress, (NodeIndex, CGWUCentralTopologyMapNodeOrigin)>,
edge_idx_map: HashMap<CGWUCentralTopologyEdge, (EdgeIndex, CGWUCentralTopologyMapEdgeOrigin)>,
graph: StableGraph<MacAddress, String>,
}
#[derive(Debug)]
pub struct CGWUCentralTopologyMap {
data: Arc<RwLock<CGWUCentralTopologyMapData>>,
}
lazy_static! {
pub static ref CGW_UCENTRAL_TOPOLOGY_MAP: CGWUcentralTopologyMap = CGWUcentralTopologyMap {
data: Arc::new(RwLock::new(CGWUcentralTopologyMapData {
pub static ref CGW_UCENTRAL_TOPOLOGY_MAP: CGWUCentralTopologyMap = CGWUCentralTopologyMap {
data: Arc::new(RwLock::new(CGWUCentralTopologyMapData {
node_idx_map: HashMap::new(),
edge_idx_map: HashMap::new(),
graph: StableGraph::new(),
@@ -50,19 +102,23 @@ lazy_static! {
};
}
impl CGWUcentralTopologyMap {
impl CGWUCentralTopologyMap {
pub fn get_ref() -> &'static Self {
&CGW_UCENTRAL_TOPOLOGY_MAP
}
pub async fn insert_device(self: &Self, mac: &MacAddress) {
pub async fn insert_device(self: &Self, serial: &MacAddress) {
let mut lock = self.data.write().await;
Self::add_node(&mut lock, mac);
Self::add_node(
&mut lock,
serial,
CGWUCentralTopologyMapNodeOrigin::UCentralDevice,
);
}
pub async fn remove_device(self: &Self, mac: &MacAddress) {
pub async fn remove_device(self: &Self, serial: &MacAddress) {
let mut lock = self.data.write().await;
Self::remove_node(&mut lock, mac);
Self::remove_node(&mut lock, serial);
}
pub async fn process_state_message(
@@ -73,21 +129,33 @@ impl CGWUcentralTopologyMap {
let mut lock = self.data.write().await;
if let CGWUCentralEventType::State(s) = evt.evt_type {
// To make sure any leftovers are handled, node that reports
// state message is getting purged and recreated:
// since state message hold <all> necessary information,
// we can safely purge all edge info and recreate it from
// the state message. Any missed / deleted by mistake
// edges will appear on the next iteration of state / realtime event
// processing.
Self::remove_node(&mut lock, &s.local_mac);
// Re-create node with origin being UCentralDevice, as this
// device is directly connected to the CGW.
Self::add_node(
&mut lock,
&s.local_mac,
CGWUCentralTopologyMapNodeOrigin::UCentralDevice,
);
// Start with LLDP info processing
for link in s.lldp_data.links {
let subedge_src = CGWUcentralTopologySubEdge {
mac: s.lldp_data.local_mac,
port: link.local_port,
let subedge_src = CGWUCentralTopologySubEdge {
serial: s.local_mac,
port: CGWUCentralTopologySubEdgePort::PhysicalWiredPort(link.local_port),
};
let subedge_dst = CGWUcentralTopologySubEdge {
mac: link.remote_mac,
port: link.remote_port,
let subedge_dst = CGWUCentralTopologySubEdge {
serial: link.remote_serial,
port: CGWUCentralTopologySubEdgePort::PhysicalWiredPort(link.remote_port),
};
// Any neighbour seen in LLDP is added to the graph.
// Whenever parent (entity reporting the LLDP data)
// get's removed - neighbour nodes and connected
// edges will be purged.
Self::add_node(&mut lock, &link.remote_mac);
// No duplicates can exists, since it's LLDP data
// (both uCentral and underlying LLDP agents do not
@@ -99,25 +167,311 @@ impl CGWUcentralTopologyMap {
Self::remove_edge(&mut lock, &subedge_src);
Self::remove_edge(&mut lock, &subedge_dst);
// Any neighbour seen in LLDP is added to the graph.
// Whenever parent (entity reporting the LLDP data)
// get's removed - neighbour nodes and connected
// edges will be purged.
Self::add_node(
&mut lock,
&link.remote_serial,
CGWUCentralTopologyMapNodeOrigin::StateLLDPPeer,
);
if link.is_downstream {
Self::add_edge(&mut lock, CGWUcentralTopologyEdge(subedge_src, subedge_dst));
Self::add_edge(
&mut lock,
CGWUCentralTopologyEdge(subedge_src, subedge_dst),
CGWUCentralTopologyMapEdgeOrigin::StateLLDPPeer,
);
} else {
Self::add_edge(&mut lock, CGWUcentralTopologyEdge(subedge_dst, subedge_src));
Self::add_edge(
&mut lock,
CGWUCentralTopologyEdge(subedge_dst, subedge_src),
CGWUCentralTopologyMapEdgeOrigin::StateLLDPPeer,
);
}
}
// Clients data second iteration:
// add all nodes seen in clients;
// add new edges;
for link in &s.clients_data.links {
// Treat state timestamp as edge-creation timestamp only for
// events that do not report explicit connection timestamp
// (no association establishment timestamp for wired clients,
// however present for wireless for example).
let mut link_timestamp = s.timestamp;
let (subedge_src, subedge_dst) = {
if let CGWUCentralEventStateClientsType::Wired(_) = link.client_type {
(
CGWUCentralTopologySubEdge {
serial: s.local_mac,
port: CGWUCentralTopologySubEdgePort::PhysicalWiredPort(
link.local_port.clone(),
),
},
CGWUCentralTopologySubEdge {
serial: link.remote_serial,
// TODO: Duplex speed?
port: CGWUCentralTopologySubEdgePort::WiredClient,
},
)
} else if let CGWUCentralEventStateClientsType::Wireless(ts, ssid, band) =
&link.client_type
{
// Since wireless association explicitly reports the
// timestamp for when link's been established, we can
// use this value reported from AP.
// For any other case (LLDP, wired), we use
// the event's base timestamp value;
link_timestamp = *ts;
(
CGWUCentralTopologySubEdge {
serial: s.local_mac,
port: CGWUCentralTopologySubEdgePort::WirelessPort,
},
CGWUCentralTopologySubEdge {
serial: link.remote_serial,
port: CGWUCentralTopologySubEdgePort::WirelessClient(
ssid.clone(),
band.clone(),
),
},
)
} else {
continue;
}
};
// In case when client silently migrates from AP1 to AP2,
// we have to explicitly remove that <edge> from AP1,
// and 'migrate' it to AP2.
// Do this only using <subedge_dst>, to make sure
// we clear only unique destination (wifi client on band X,
// for example) edge counterparts.
// NOTE: deleting subedge will remove both SRC and DST
// from map, as map stores them separately.
Self::remove_edge(&mut lock, &subedge_dst);
Self::add_node(
&mut lock,
&link.remote_serial,
CGWUCentralTopologyMapNodeOrigin::StateWiredWireless,
);
if link.is_downstream {
Self::add_edge(
&mut lock,
CGWUCentralTopologyEdge(subedge_src, subedge_dst),
CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless(link_timestamp),
);
} else {
Self::add_edge(
&mut lock,
CGWUCentralTopologyEdge(subedge_dst, subedge_src),
CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless(link_timestamp),
);
}
}
}
}
pub async fn process_device_topology_event(self: &Self) {}
pub async fn process_device_topology_event(
self: &Self,
_device_type: &CGWDeviceType,
evt: CGWUCentralEvent,
) {
struct ExistingEdge {
idx: EdgeIndex,
timestamp: EdgeCreationTimestamp,
key: CGWUCentralTopologyEdge,
}
let mut lock = self.data.write().await;
let mut existing_edge: Option<ExistingEdge> = None;
fn add_node(data: &mut CGWUcentralTopologyMapData, node_mac: &MacAddress) -> NodeIndex {
match data.node_idx_map.get(node_mac) {
if let CGWUCentralEventType::RealtimeEvent(rt) = evt.evt_type {
if let CGWUCentralEventRealtimeEventType::WirelessClientJoin(rt_j) = &rt.evt_type {
for key in lock.edge_idx_map.keys() {
// Try to find <existing> edge:
// we're looking for an edge with wireless client
// (<mac>) with specific (<band>) properties.
// However, the check is global:
// We do not care <which> AP reported the client initially:
// since the new client can appear on any given AP that
// is connected to us, we have to make sure that if
// AP2 receives client.join, and client serial is already
// associated with AP1, the connection edge between
// AP1 and <client.serial> should be purged,
// and then assigned to AP2.
//
// This only applies, however, to the join message.
// Late-leave events should be ignored, in case if
// client appears on new/other AP.
if let CGWUCentralTopologySubEdgePort::WirelessClient(_, dst_band) = &key.1.port
{
if key.1.serial == rt_j.client && *dst_band == *rt_j.band {
if let Some((edge_idx, edge_origin)) = lock.edge_idx_map.get(key) {
if let CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless(
edge_timestamp,
) = edge_origin
{
existing_edge = Some(ExistingEdge {
idx: *edge_idx,
timestamp: edge_timestamp.clone(),
key: key.to_owned(),
});
break;
}
}
}
}
}
if let Some(e) = existing_edge {
// New client joined, and new event timestamp is bigger (newer):
// - delete existing edge from map;
// - update graph;
if rt.timestamp > e.timestamp {
let _ = lock.graph.remove_edge(e.idx);
// Remove SRC (tuple idx 0 == src) -> DST (idx 1 == dst) edge
let mut edge = CGWUCentralTopologyEdge(e.key.0, e.key.1);
let _ = lock.edge_idx_map.remove(&edge);
// We do not delete the leaf-disconnected bode,
// as we will try to recreate it later on anyways.
// Remove DST (tuple idx 1 == dst) -> SRC (idx 0 == src) edge
edge = CGWUCentralTopologyEdge(edge.1, edge.0);
let _ = lock.edge_idx_map.remove(&edge);
} else {
warn!(
"Received late join event: event ts {:?} vs existing edge ts {:?}",
rt.timestamp, e.timestamp
);
// New event is a late-reported / processed event;
// We can safely skip it;
return;
}
}
// Now simply update internall state:
// - create node (if doesnt exist already)
// - create edge;
// - update graph;
Self::add_node(
&mut lock,
&rt_j.client,
CGWUCentralTopologyMapNodeOrigin::StateWiredWireless,
);
let (subedge_src, subedge_dst) = {
(
CGWUCentralTopologySubEdge {
serial: evt.serial.clone(),
port: CGWUCentralTopologySubEdgePort::WirelessPort,
},
CGWUCentralTopologySubEdge {
serial: rt_j.client,
port: CGWUCentralTopologySubEdgePort::WirelessClient(
rt_j.ssid.clone(),
rt_j.band.clone(),
),
},
)
};
Self::add_edge(
&mut lock,
CGWUCentralTopologyEdge(subedge_src, subedge_dst),
CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless(rt.timestamp),
);
} else if let CGWUCentralEventRealtimeEventType::WirelessClientLeave(rt_l) = rt.evt_type
{
for key in lock.edge_idx_map.keys() {
// Try to find <existing> edge:
// we're looking for an edge with wireless client
// (<mac>) with specific (<band>) properties, which is also
// reported by the <same> AP, as it's a leave event
// (AP1 can't expect us to delete existing edge, if AP2
// is already associated with this client)
if let CGWUCentralTopologySubEdgePort::WirelessClient(_, dst_band) = &key.1.port
{
if key.1.serial == rt_l.client && *dst_band == *rt_l.band &&
// Part that checks if AP that reports <client.leave> also
// is associated with this client.
// If not - it's a 'late' leave event that can be ignored.
key.0.serial == evt.serial
{
if let Some((edge_idx, edge_origin)) = lock.edge_idx_map.get(key) {
if let CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless(
edge_timestamp,
) = edge_origin
{
existing_edge = Some(ExistingEdge {
idx: *edge_idx,
timestamp: edge_timestamp.clone(),
key: key.to_owned(),
});
break;
}
}
}
}
}
if let Some(e) = existing_edge {
// We still have to check whether this leave message
// is newer than the existing timestamp:
// It's possible that state + leave events were shuffled,
// in a way that leave gets processed only after state's
// processing's been completed.
// This results in a discardtion of the late leave event.
if rt.timestamp > e.timestamp {
let _ = lock.graph.remove_edge(e.idx);
// Remove SRC (tuple idx 0 == src) -> DST (idx 1 == dst) edge
let mut edge = CGWUCentralTopologyEdge(e.key.0, e.key.1);
let _ = lock.edge_idx_map.remove(&edge);
// Also remove dst node if it's a leaf-disconnected node
Self::remove_disconnected_leaf_node(&mut lock, &edge.1.serial);
// Remove DST (tuple idx 1 == dst) -> SRC (idx 0 == src) edge
edge = CGWUCentralTopologyEdge(edge.1, edge.0);
let _ = lock.edge_idx_map.remove(&edge);
} else {
warn!(
"Received late leave event: event ts {:?} vs existing edge ts {:?}",
rt.timestamp, e.timestamp
);
// New event is a late-reported / processed event;
// We can safely skip it;
return;
}
}
}
}
}
fn add_node(
data: &mut CGWUCentralTopologyMapData,
node_mac: &MacAddress,
origin: CGWUCentralTopologyMapNodeOrigin,
) -> NodeIndex {
match data.node_idx_map.get_mut(node_mac) {
None => {
let idx = data.graph.add_node(node_mac.to_hex_string());
let _ = data.node_idx_map.insert(node_mac.clone(), idx);
let idx = data.graph.add_node(*node_mac);
let _ = data.node_idx_map.insert(node_mac.clone(), (idx, origin));
idx
}
Some(idx) => *idx,
Some((idx, existing_origin)) => {
if let CGWUCentralTopologyMapNodeOrigin::UCentralDevice = existing_origin {
*idx
} else {
*existing_origin = origin;
*idx
}
}
}
// TODO: handle <already present in the map> case:
// this either means that we detected this node at some new
@@ -125,11 +479,56 @@ impl CGWUcentralTopologyMap {
// e.g. delete all connected edges, child nodes etc etc;
}
fn remove_node(data: &mut CGWUcentralTopologyMapData, node_mac: &MacAddress) {
if let Some(node) = data.node_idx_map.remove(node_mac) {
// Checks before removal, safe to call
fn remove_disconnected_leaf_node(data: &mut CGWUCentralTopologyMapData, node_mac: &MacAddress) {
let mut node_idx_to_remove: Option<NodeIndex> = None;
if let Some((node_idx, origin)) = data.node_idx_map.get(node_mac) {
// Skip this node, as it's origin is known from
// uCentral connection, not state data.
if let CGWUCentralTopologyMapNodeOrigin::UCentralDevice = origin {
debug!("Not removing disconnected leaf {:?} - reason: uCentral device (direct connection to CGW)", node_mac);
return;
}
let mut edges = data
.graph
// We're interested only if there are <incoming> edges for
// this (potentially) disconnected leaf-node
.neighbors_directed(*node_idx, Direction::Incoming)
.detach();
if let None = edges.next_edge(&data.graph) {
node_idx_to_remove = Some(*node_idx);
}
}
if let Some(node_idx) = node_idx_to_remove {
debug!("MAC {:?} is a disconnected leaf node, removing", node_mac);
data.node_idx_map.remove(node_mac);
data.graph.remove_node(node_idx);
}
}
fn remove_node(data: &mut CGWUCentralTopologyMapData, node_mac: &MacAddress) {
if let Some((node, _)) = data.node_idx_map.remove(node_mac) {
// 'Potential' list of nodes we can safely remove.
// Duplicates may exist, because multiple edges can originate
// from src node (AP, switch) to a single other node
// (for example client's connected both through
// the WiFi and the cable, or client's connected
// to the AP on multiple bands etc).
//
// Not every node from this list gets removed, as
// once again: node (client) can be connected to
// multiple APs at once on different bands,
// or client's seen for example both on WiFi
// and cable.
let mut nodes_to_remove: Vec<NodeIndex> = Vec::new();
let mut edges_to_remove: Vec<EdgeIndex> = Vec::new();
let mut map_keys_to_remove: Vec<CGWUcentralTopologyEdge> = Vec::new();
let edges = [
let mut map_edge_keys_to_remove: Vec<CGWUCentralTopologyEdge> = Vec::new();
let mut edges = [
data.graph
.neighbors_directed(node, Direction::Outgoing)
.detach(),
@@ -138,50 +537,138 @@ impl CGWUcentralTopologyMap {
.detach(),
];
for mut x in edges {
while let Some(edge) = x.next_edge(&data.graph) {
data.graph.remove_edge(edge);
edges_to_remove.push(edge);
while let Some(edge) = edges[0].next_edge(&data.graph) {
// We iterate over edges that are connected with this SRC
// node, and collect all the destination Node indexes,
// to check them afterwards whether they still have
// some edges connected to them.
// If not - we remove the nodes out off the internal map.
// NOTE: It's possible that two Websocket devices are
// connected and we'll try to remove DST node even though
// knowledge about this device's presence in our map
// originates from WSS connection, not state message.
// However, internal map also has meta information
// about the origin of appearence in map, hence
// it solves the issue.
// (if node.origin == WSS then <do not remove node>)
//
// NOTE: we do this only for <Outgoing> neighbors
// From treeview-graph perspective, we're clearing <leaf>
// nodes that originate from this <node_mac> device.
if let Some((_, node_dst)) = data.graph.edge_endpoints(edge) {
nodes_to_remove.push(node_dst);
}
data.graph.remove_edge(edge);
edges_to_remove.push(edge);
}
for (k, e) in &data.edge_idx_map {
for x in &edges_to_remove {
if x == e {
map_keys_to_remove.push(k.to_owned());
while let Some(edge) = edges[1].next_edge(&data.graph) {
data.graph.remove_edge(edge);
edges_to_remove.push(edge);
}
for node_idx in nodes_to_remove {
let mut node_edges = data
.graph
.neighbors_directed(node_idx, Direction::Incoming)
.detach();
// Check if at least one edge is connecting this
// Node; If not - purge it, but only if this node
// has been added through the means of State message
// or realtime events.
//
// If it's an active WSS connection we have established,
// we should skip this node, as it's not our responsibility
// here to destroy it.
if let None = node_edges.next_edge(&data.graph) {
let mut node_to_remove: Option<&MacAddress> = None;
if let Some(node_weight) = data.graph.node_weight(node_idx) {
if let Some((_, origin)) = data.node_idx_map.get(&node_weight) {
// Skip this node, as it's origin is known from
// uCentral connection, not state data.
if let CGWUCentralTopologyMapNodeOrigin::UCentralDevice = origin {
debug!("Not removing disconnected leaf {:?} - reason: uCentral device (direct connection to CGW)", node_weight);
continue;
}
node_to_remove = Some(node_weight);
}
}
if let Some(node_mac) = node_to_remove {
data.node_idx_map.remove(node_mac);
data.graph.remove_node(node_idx);
}
}
}
for x in map_keys_to_remove {
data.edge_idx_map.remove(&x);
for (k, e) in &data.edge_idx_map {
for x in &edges_to_remove {
if *x == e.0 {
map_edge_keys_to_remove.push(k.to_owned());
}
}
}
for key in map_edge_keys_to_remove {
data.edge_idx_map.remove(&key);
}
data.graph.remove_node(node);
}
}
fn add_edge(data: &mut CGWUcentralTopologyMapData, edge: CGWUcentralTopologyEdge) {
let node_src_subedge: CGWUcentralTopologySubEdge = edge.0;
let node_dst_subedge: CGWUcentralTopologySubEdge = edge.1;
let node_src_idx = Self::add_node(data, &node_src_subedge.mac);
let node_dst_idx = Self::add_node(data, &node_dst_subedge.mac);
fn add_edge(
data: &mut CGWUCentralTopologyMapData,
edge: CGWUCentralTopologyEdge,
origin: CGWUCentralTopologyMapEdgeOrigin,
) {
let node_src_subedge: CGWUCentralTopologySubEdge = edge.0;
let node_dst_subedge: CGWUCentralTopologySubEdge = edge.1;
let (node_src_idx, node_dst_idx) = {
(
match data.node_idx_map.get(&node_src_subedge.serial) {
Some((idx, _)) => *idx,
None => {
warn!(
"Tried to add edge for non-existing node {:?}",
node_src_subedge.serial
);
return;
}
},
match data.node_idx_map.get(&node_dst_subedge.serial) {
Some((idx, _)) => *idx,
None => {
warn!(
"Tried to add edge for non-existing node {:?}",
node_dst_subedge.serial
);
return;
}
},
)
};
let edge_idx = data.graph.add_edge(
node_src_idx,
node_dst_idx,
format!("{}<->{}", node_src_subedge.port, node_dst_subedge.port),
format!("{:?}<->{:?}", node_src_subedge.port, node_dst_subedge.port),
);
data.edge_idx_map.insert(
CGWUcentralTopologyEdge(node_src_subedge.clone(), node_dst_subedge.clone()),
edge_idx,
CGWUCentralTopologyEdge(node_src_subedge.clone(), node_dst_subedge.clone()),
(edge_idx, origin.clone()),
);
data.edge_idx_map.insert(
CGWUcentralTopologyEdge(node_dst_subedge, node_src_subedge),
edge_idx,
CGWUCentralTopologyEdge(node_dst_subedge, node_src_subedge),
(edge_idx, origin.clone()),
);
}
fn remove_edge(data: &mut CGWUcentralTopologyMapData, subedge: &CGWUcentralTopologySubEdge) {
let mut keys_to_remove: Vec<CGWUcentralTopologyEdge> = Vec::new();
fn remove_edge(data: &mut CGWUCentralTopologyMapData, subedge: &CGWUCentralTopologySubEdge) {
let mut keys_to_remove: Vec<CGWUCentralTopologyEdge> = Vec::new();
for key in data.edge_idx_map.keys() {
if key.0 == *subedge || key.1 == *subedge {
@@ -191,8 +678,8 @@ impl CGWUcentralTopologyMap {
}
if let Some(key) = keys_to_remove.get(0) {
if let Some(v) = data.edge_idx_map.get(key) {
data.graph.remove_edge(*v);
if let Some((edge_idx, _)) = data.edge_idx_map.get(key) {
data.graph.remove_edge(*edge_idx);
}
}
@@ -211,7 +698,13 @@ impl CGWUcentralTopologyMap {
&|_, er| { format!("label = \"{}\"", er.weight()) },
&|_, nr| { format!("label = \"{}\" shape=\"record\"", nr.weight()) }
)
)
.replace("digraph {", "digraph {\n\trankdir=LR;\n");
debug!(
"graph dump: {} {}\n{}",
lock.node_idx_map.len(),
lock.edge_idx_map.len(),
dotfmt
);
info!("graph dump:\n{}", dotfmt);
}
}

View File

@@ -50,10 +50,10 @@ gen_cert()
local key=$3
local cert=$4
# generate key and request to sign
openssl req -config $CONF_FILE -x509 -nodes -newkey rsa:4096 -sha512 -days 365 \
openssl req -config $CONF_FILE -x509 -nodes -newkey rsa:4096 -sha256 -days 365 \
-extensions $type -subj "/CN=$cn" -out $req_file -keyout $key &> /dev/null
# sign certificate
openssl x509 -extfile $CONF_FILE -CA $CA_CERT -CAkey $CA_KEY -CAcreateserial -sha512 -days 365 \
openssl x509 -extfile $CONF_FILE -CA $CA_CERT -CAkey $CA_KEY -CAcreateserial -sha256 -days 365 \
-in $req_file -out $pem
if [ $? == "0" ]
then
@@ -133,7 +133,7 @@ if [ "$GEN_CA" == "y" ]
then
echo Generating root CA certificate
mkdir -p $CA_DIR
openssl req -config $CONF_FILE -x509 -nodes -newkey rsa:4096 -sha512 -days 365 \
openssl req -config $CONF_FILE -x509 -nodes -newkey rsa:4096 -sha256 -days 365 \
-extensions ca -subj "/CN=CA" -out $CA_CERT -keyout $CA_KEY &> /dev/null
fi