From cee366eb06f9a0bef7fc1dccd0bb885bb608eba0 Mon Sep 17 00:00:00 2001 From: Oleksandr Mazur Date: Fri, 17 May 2024 16:30:21 +0300 Subject: [PATCH] Topo-map: Implement realtime (leave/join) client events handling - Timestamp for events is calculated upon socket read, which is the most accurate TS we can have; - topo map uses the TS in calculations to deduce the 'latest' most-actual event while constructing map; - Origin of edges and nodes is preserved, which means topomap would not delete any nodes missing from state message, in case if this node is also a directly connected uCentral device instance; - Disconnected leaf-nodes are automatically removed, in case if their origin's not uCentral direct connection (info about node received from state/rt evts etc) Signed-off-by: Oleksandr Mazur --- Cargo.toml | 1 + src/cgw_connection_processor.rs | 19 +- src/cgw_connection_server.rs | 6 +- src/cgw_ucentral_ap_parser.rs | 748 ++++++++++++++++++++++--- src/cgw_ucentral_parser.rs | 80 ++- src/cgw_ucentral_switch_parser.rs | 35 +- src/cgw_ucentral_topology_map.rs | 627 ++++++++++++++++++--- utils/cert_generator/generate_certs.sh | 6 +- 8 files changed, 1331 insertions(+), 191 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 29d8f95..ff860fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ base64 = "0.22.0" rustls-pemfile = "2.1.2" rustls-pki-types = "1.7.0" x509-parser = "0.16.0" +chrono = "0.4.38" [build-dependencies] tonic-build = "0.10" diff --git a/src/cgw_connection_processor.rs b/src/cgw_connection_processor.rs index 4c4cdbf..1ae93c8 100644 --- a/src/cgw_connection_processor.rs +++ b/src/cgw_connection_processor.rs @@ -9,9 +9,10 @@ use crate::{ cgw_ucentral_event_parse, cgw_ucentral_parse_connect_event, CGWUCentralCommandType, CGWUCentralEventType, }, - cgw_ucentral_topology_map::CGWUcentralTopologyMap, + cgw_ucentral_topology_map::CGWUCentralTopologyMap, }; +use chrono::offset::Local; use eui48::MacAddress; use futures_util::{ stream::{SplitSink, SplitStream}, @@ -225,15 +226,21 @@ impl CGWConnectionProcessor { fsm_state: &mut CGWUCentralMessageProcessorState, pending_req_id: u64, ) -> Result { + // Make sure we always track the as accurate as possible the time + // of receiving of the event (where needed). + let timestamp = Local::now(); + match msg { Ok(msg) => match msg { Close(_t) => { return Ok(CGWConnectionState::ClosedGracefully); } Text(payload) => { - if let Ok(evt) = cgw_ucentral_event_parse(&device_type, &payload) { + if let Ok(evt) = + cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp()) + { if let CGWUCentralEventType::State(_) = evt.evt_type { - let topo_map = CGWUcentralTopologyMap::get_ref(); + let topo_map = CGWUCentralTopologyMap::get_ref(); topo_map.process_state_message(&device_type, evt).await; topo_map.debug_dump_map().await; } else if let CGWUCentralEventType::Reply(content) = evt.evt_type { @@ -241,6 +248,12 @@ impl CGWConnectionProcessor { assert_eq!(content.id, pending_req_id); *fsm_state = CGWUCentralMessageProcessorState::Idle; debug!("Got reply event for pending request id: {}", pending_req_id); + } else if let CGWUCentralEventType::RealtimeEvent(_) = evt.evt_type { + let topo_map = CGWUCentralTopologyMap::get_ref(); + topo_map + .process_device_topology_event(&device_type, evt) + .await; + topo_map.debug_dump_map().await; } } diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 870529c..49f1098 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -7,7 +7,7 @@ use crate::cgw_ucentral_messages_queue_manager::{ }; use crate::cgw_ucentral_parser::{cgw_ucentral_parse_command_message, CGWUCentralCommand}; use crate::cgw_ucentral_parser::{CGWDeviceChange, CGWDeviceChangedData, CGWToNBMessageType}; -use crate::cgw_ucentral_topology_map::CGWUcentralTopologyMap; +use crate::cgw_ucentral_topology_map::CGWUCentralTopologyMap; use crate::AppArgs; use crate::{ @@ -1025,7 +1025,7 @@ impl CGWConnectionServer { ); } - let topo_map = CGWUcentralTopologyMap::get_ref(); + let topo_map = CGWUCentralTopologyMap::get_ref(); topo_map.insert_device(&device_mac).await; topo_map.debug_dump_map().await; @@ -1061,7 +1061,7 @@ impl CGWConnectionServer { devices_cache.dump_devices_cache(); } - let topo_map = CGWUcentralTopologyMap::get_ref(); + let topo_map = CGWUCentralTopologyMap::get_ref(); topo_map.remove_device(&device_mac).await; topo_map.debug_dump_map().await; diff --git a/src/cgw_ucentral_ap_parser.rs b/src/cgw_ucentral_ap_parser.rs index 3ec4f39..edae88b 100644 --- a/src/cgw_ucentral_ap_parser.rs +++ b/src/cgw_ucentral_ap_parser.rs @@ -1,49 +1,681 @@ use base64::prelude::*; use eui48::MacAddress; use flate2::read::ZlibDecoder; -use serde_json::Value; +use serde_json::{Map, Value}; use std::io::prelude::*; -use std::str::FromStr; +use std::{collections::HashMap, str::FromStr}; use crate::cgw_ucentral_parser::{ CGWUCentralEvent, CGWUCentralEventConnect, CGWUCentralEventConnectParamsCaps, - CGWUCentralEventLog, CGWUCentralEventReply, CGWUCentralEventState, - CGWUCentralEventStateLLDPData, CGWUCentralEventStateLLDPDataLinks, CGWUCentralEventType, - CGWUcentralJRPCMessage, + CGWUCentralEventLog, CGWUCentralEventRealtimeEvent, CGWUCentralEventRealtimeEventType, + CGWUCentralEventRealtimeEventWClientJoin, CGWUCentralEventRealtimeEventWClientLeave, + CGWUCentralEventReply, CGWUCentralEventState, CGWUCentralEventStateClients, + CGWUCentralEventStateClientsData, CGWUCentralEventStateClientsType, + CGWUCentralEventStateLLDPData, CGWUCentralEventStateLinks, CGWUCentralEventType, + CGWUCentralJRPCMessage, }; -fn parse_lldp_data(data: &Value) -> Vec { - let mut links: Vec = Vec::new(); +fn parse_lldp_data(lldp_peers: &Map, links: &mut Vec) { + let directions = [ + (lldp_peers["upstream"].as_object().unwrap(), false), + (lldp_peers["downstream"].as_object().unwrap(), true), + ]; - if let Value::Object(map) = data { - let directions = [ - (map["upstream"].as_object().unwrap(), false), - (map["downstream"].as_object().unwrap(), true), - ]; + for (d, is_downstream) in directions { + for (key, value) in d { + let data = value.as_array().unwrap()[0].as_object().unwrap(); + let local_port = key.to_string().replace("WAN", "eth0").replace("LAN", "eth"); + let remote_serial = MacAddress::from_str(data["mac"].as_str().unwrap()).unwrap(); + let remote_port = data["port_id"].as_str().unwrap().to_string(); - for (d, is_downstream) in directions { - for (key, value) in d { - let data = value.as_array().unwrap()[0].as_object().unwrap(); + links.push(CGWUCentralEventStateLinks { + local_port, + remote_serial, + remote_port, + is_downstream, + }); + } + } +} - let local_port = key.to_string().replace("WAN", "eth0").replace("LAN", "eth"); - let remote_mac = MacAddress::from_str(data["mac"].as_str().unwrap()).unwrap(); - let remote_port = data["port_id"].as_str().unwrap().to_string(); +fn parse_wireless_ssids_info( + ssids: &Vec, + ssids_map: &mut HashMap, +) { + for s in ssids { + if let Value::Object(ssid) = s { + if !ssid.contains_key("band") + || !ssid.contains_key("bssid") + || !ssid.contains_key("ssid") + { + continue; + } - links.push(CGWUCentralEventStateLLDPDataLinks { - local_port, - remote_mac, - remote_port, - is_downstream, - }); + let band_value = { + if let Value::String(band_str) = &ssid["band"] { + band_str.clone() + } else { + continue; + } + }; + let bssid_value = { + if let Value::String(bssid_str) = &ssid["bssid"] { + bssid_str.clone() + } else { + continue; + } + }; + let ssid_value = { + if let Value::String(ssid_str) = &ssid["ssid"] { + ssid_str.clone() + } else { + continue; + } + }; + + ssids_map.insert(bssid_value, (ssid_value, band_value)); + } + } +} + +fn parse_wireless_clients_data( + ssids: &Vec, + links: &mut Vec, + upstream_ifaces: &Vec, + ssids_map: &HashMap, + timestamp: i64, +) { + for s in ssids { + if let Value::Object(ssid) = s { + let local_port = { + if let Value::String(port) = &ssid["iface"] { + port.clone() + } else { + warn!("Failed to retrieve local_port for {:?}", ssid); + continue; + } + }; + + // Upstream WAN iface? Not supported + if upstream_ifaces.iter().any(|i| *i == local_port) { + warn!( + "Skipped ssid wireless client info {:?} for upstream interface", + ssid + ); + continue; + } + + if !ssid.contains_key("associations") { + warn!("Failed to retrieve associations for {local_port}"); + continue; + } + + if let Value::Array(associations) = &ssid["associations"] { + for association in associations { + let mut ts = 0i64; + if let Value::Object(map) = association { + if !map.contains_key("station") || !map.contains_key("connected") { + continue; + } + + let bssid_value = { + if let Value::String(bssid) = &map["bssid"] { + bssid.clone() + } else { + continue; + } + }; + + let (ssid, band) = { + if let Some(v) = ssids_map.get(&bssid_value) { + (v.0.clone(), v.1.clone()) + } else { + warn!("Failed to get ssid/band value for {bssid_value}"); + continue; + } + }; + + let remote_serial = + MacAddress::from_str(map["station"].as_str().unwrap()).unwrap(); + + // Time, for how long this connection's been associated + // with the AP that reports this data. + if let Value::Number(t) = &map["connected"] { + ts = t.as_i64().unwrap(); + } + + links.push(CGWUCentralEventStateClients { + client_type: CGWUCentralEventStateClientsType::Wireless( + // Track timestamp of initial connection: + // if we receive state evt , substract + // connected since from it, to get + // original connection timestamp. + timestamp - ts, + ssid, + band, + ), + local_port: local_port.clone(), + remote_serial, + // TODO: rework remote_port to have Band, RSSI, chan etc + // for an edge. + remote_port: "".to_string(), + is_downstream: true, + }); + } + } } } } - - links } -pub fn cgw_ucentral_ap_parse_message(message: &String) -> Result { - let map: CGWUcentralJRPCMessage = match serde_json::from_str(message) { +fn parse_wired_clients_data( + clients: &Vec, + links: &mut Vec, + upstream_ifaces: &Vec, + timestamp: i64, +) { + for client in clients { + let local_port = { + if let Value::Array(arr) = &client["ports"] { + match arr[0].as_str() { + Some(s) => s.to_string(), + None => { + warn!( + "Failed to get clients port string for {:?}, skipping", + client + ); + continue; + } + } + } else { + warn!("Failed to parse clients port for {:?}, skipping", client); + continue; + } + }; + + // Skip wireless clients info + if local_port.contains("wlan") || local_port.contains("WLAN") { + continue; + } + + // TODO: W/A for now: ignore any upstream-reported clients, + // because it includes ARP neighbours and clients. + // The logic to process uplink neighbors properly should be + // much more complicated and should be treated as a + // separate case. + // This logic also overlaps the uplink switch's neighbor + // detection logic (LLDP, ARP, FDB etc), and building topo map + // based purely on AP's input could result in invalid + // map formation. + if upstream_ifaces.iter().any(|i| *i == local_port) { + continue; + } + + let remote_serial = MacAddress::from_str(client["mac"].as_str().unwrap()).unwrap(); + + links.push(CGWUCentralEventStateClients { + // Wired clients don't have data. + // Treat as latest connected ts. + client_type: CGWUCentralEventStateClientsType::Wired(timestamp), + local_port, + remote_serial, + // TODO: rework remote_port to have speed / duplex characteristics + // for an edge. + remote_port: "".to_string(), + is_downstream: true, + }); + } +} + +fn parse_interface_data( + interface: &Map, + links: &mut Vec, + upstream_ifaces: &Vec, + timestamp: i64, +) { + if interface.contains_key("clients") { + if let Value::Array(clients) = &interface["clients"] { + parse_wired_clients_data(&clients, links, upstream_ifaces, timestamp); + } + } + + if interface.contains_key("ssids") { + let mut ssids_map: HashMap = HashMap::new(); + if let Value::Array(ssids) = &interface["ssids"] { + parse_wireless_ssids_info(ssids, &mut ssids_map); + parse_wireless_clients_data(&ssids, links, upstream_ifaces, &ssids_map, timestamp); + } + } +} + +fn parse_link_state_data( + link_state: &Map, + upstream_ifaces: &mut Vec, + downstream_ifaces: &mut Vec, +) { + if let Value::Object(upstream_obj) = &link_state["upstream"] { + for (k, _v) in upstream_obj.iter() { + upstream_ifaces.push(k.to_string()); + } + } + + if let Value::Object(downstream_obj) = &link_state["downstream"] { + for (k, _v) in downstream_obj.iter() { + downstream_ifaces.push(k.to_string()); + } + } +} + +fn parse_state_event_data( + map: CGWUCentralJRPCMessage, + timestamp: i64, +) -> Result { + if !map.contains_key("params") { + return Err("Invalid state event received: params is missing"); + } + + let params = &map["params"]; + + if let Value::String(compressed_data) = ¶ms["compress_64"] { + let decoded_data = match BASE64_STANDARD.decode(compressed_data) { + Ok(d) => d, + Err(e) => { + warn!("Failed to decode base64+zip state evt {e}"); + return Err("Failed to decode base64+zip state evt"); + } + }; + let mut d = ZlibDecoder::new(&decoded_data[..]); + let mut unzipped_data = String::new(); + if let Err(e) = d.read_to_string(&mut unzipped_data) { + warn!("Failed to decompress decrypted state message {e}"); + return Err("Failed to decompress decrypted state message"); + } + + let state_map: CGWUCentralJRPCMessage = match serde_json::from_str(&unzipped_data) { + Ok(m) => m, + Err(e) => { + error!("Failed to parse input state message {e}"); + return Err("Failed to parse input state message"); + } + }; + + let serial = MacAddress::from_str(state_map["serial"].as_str().unwrap()).unwrap(); + + if let Value::Object(state_map) = &state_map["state"] { + let mut lldp_links: Vec = Vec::new(); + let mut clients_links: Vec = Vec::new(); + + if state_map.contains_key("lldp-peers") { + if let Value::Object(v) = &state_map["lldp-peers"] { + parse_lldp_data(&v, &mut lldp_links); + } + } + + let mut upstream_ifaces: Vec = Vec::new(); + let mut downstream_ifaces: Vec = Vec::new(); + + if state_map.contains_key("link-state") { + if let Value::Object(obj) = &state_map["link-state"] { + parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces); + } + } + + if let Value::Array(arr) = &state_map["interfaces"] { + for interface in arr { + if let Value::Object(iface) = interface { + parse_interface_data( + &iface, + &mut clients_links, + &mut upstream_ifaces, + timestamp, + ); + } + } + } + + let state_event = CGWUCentralEvent { + serial, + evt_type: CGWUCentralEventType::State(CGWUCentralEventState { + timestamp, + local_mac: serial.clone(), + lldp_data: CGWUCentralEventStateLLDPData { links: lldp_links }, + clients_data: CGWUCentralEventStateClientsData { + links: clients_links, + }, + }), + }; + + return Ok(state_event); + } + + return Err("Parsed, decompressed state message but failed to find state object"); + } else if let Value::Object(state_map) = ¶ms["state"] { + let serial = MacAddress::from_str(state_map["serial"].as_str().unwrap()).unwrap(); + let mut lldp_links: Vec = Vec::new(); + let mut clients_links: Vec = Vec::new(); + + if state_map.contains_key("lldp-peers") { + if let Value::Object(v) = &state_map["lldp-peers"] { + parse_lldp_data(&v, &mut lldp_links); + } + } + + let mut upstream_ifaces: Vec = Vec::new(); + let mut downstream_ifaces: Vec = Vec::new(); + + if state_map.contains_key("link-state") { + if let Value::Object(obj) = &state_map["link-state"] { + parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces); + } + } + + if let Value::Array(arr) = &state_map["interfaces"] { + for interface in arr { + if let Value::Object(iface) = interface { + parse_interface_data( + &iface, + &mut clients_links, + &mut upstream_ifaces, + timestamp, + ); + } + } + } + + let state_event = CGWUCentralEvent { + serial, + evt_type: CGWUCentralEventType::State(CGWUCentralEventState { + timestamp, + local_mac: serial, + lldp_data: CGWUCentralEventStateLLDPData { links: lldp_links }, + clients_data: CGWUCentralEventStateClientsData { + links: clients_links, + }, + }), + }; + + return Ok(state_event); + } + + return Err("Failed to parse state event"); +} + +fn parse_realtime_event_data( + map: CGWUCentralJRPCMessage, + timestamp: i64, +) -> Result { + if !map.contains_key("params") { + return Err("Invalid event received: params is missing"); + } + + let params = &map["params"]; + + let serial = { + match params["serial"].as_str() { + Some(v) => match MacAddress::from_str(v) { + Ok(serial) => serial, + Err(_) => { + return Err("Invalid event received: serial is an invalid MAC address"); + } + }, + None => { + return Err("Invalid event received: serial is missing"); + } + } + }; + let events = match ¶ms["data"]["event"] { + Value::Array(events) => events, + _ => { + return Err("Invalid event received: data:event missing"); + } + }; + + if events.len() < 2 { + warn!("Received malformed event: number of event values < 2"); + return Err("Received malformed event: number of event values < 2"); + } + + // We don't actually care about this TS, but it's a format-abiding requirement + // put onto the AP's (we expect ts to be there). + match &events[0] { + Value::Number(ts) => { + if let None = ts.as_i64() { + warn!("Received malformed event: missing timestamp"); + return Err("Received malformed event: missing timestamp"); + } + } + _ => { + warn!("Received malformed event: missing timestamp"); + return Err("Received malformed event: missing timestamp"); + } + }; + + let event_data = match &events[1] { + Value::Object(v) => v, + _ => { + warn!("Received malformed event: missing timestamp"); + return Err("Received malformed event: missing timestamp"); + } + }; + + if !event_data.contains_key("type") { + warn!("Received malformed event: missing type"); + return Err("Received malformed event: missing type"); + } + + let evt_type = match &event_data["type"] { + Value::String(t) => t, + _ => { + warn!("Received malformed event: type is of wrongful underlying format/type"); + return Err("Received malformed event: type is of wrongful underlying format/type"); + } + }; + + let evt_payload = match &event_data["payload"] { + Value::Object(d) => d, + _ => { + warn!("Received malformed event: payload is of wrongful underlying format/type"); + return Err("Received malformed event: payload is of wrongful underlying format/type"); + } + }; + + match evt_type.as_str() { + "client.join" => { + if !evt_payload.contains_key("band") + || !evt_payload.contains_key("client") + || !evt_payload.contains_key("ssid") + || !evt_payload.contains_key("rssi") + || !evt_payload.contains_key("channel") + { + warn!("Received malformed client.join event: band, rssi, ssid, channel and client are required"); + return Err("Received malformed client.join event: band, rssi, ssid, channel and client are required"); + } + + let band = { + match &evt_payload["band"] { + Value::String(s) => s, + _ => { + warn!("Received malformed client.join event: band is of wrongful underlying format/type"); + return Err( + "Received malformed client.join event: band is of wrongful underlying format/type", + ); + } + } + }; + let client = { + match &evt_payload["client"] { + Value::String(s) => match MacAddress::from_str(s.as_str()) { + Ok(v) => v, + Err(_) => { + warn!("Received malformed client.join event: client is a malformed MAC address"); + return Err( + "Received malformed client.join event: client is a malformed MAC address", + ); + } + }, + _ => { + warn!("Received malformed client.join event: client is of wrongful underlying format/type"); + return Err( + "Received malformed client.join event: client is of wrongful underlying format/type", + ); + } + } + }; + let ssid = { + match &evt_payload["ssid"] { + Value::String(s) => s, + _ => { + warn!("Received malformed client.join event: ssid is of wrongful underlying format/type"); + return Err( + "Received malformed client.join event: ssid is of wrongful underlying format/type", + ); + } + } + }; + let rssi = { + match &evt_payload["rssi"] { + Value::Number(s) => match s.as_i64() { + Some(v) => v, + None => { + warn!("Received malformed client.join event: rssi is NaN?"); + return Err("Received malformed client.join event: rssi is NaN?"); + } + }, + _ => { + warn!("Received malformed client.join event: rssi is of wrongful underlying format/type"); + return Err( + "Received malformed client.join event: rssi is of wrongful underlying format/type", + ); + } + } + }; + let channel = { + match &evt_payload["channel"] { + Value::Number(s) => match s.as_u64() { + Some(v) => v, + None => { + warn!("Received malformed client.join event: channel is NaN?"); + return Err("Received malformed client.join event: channel is NaN?"); + } + }, + _ => { + warn!("Received malformed client.join event: channel is of wrongful underlying format/type"); + return Err( + "Received malformed client.join event: channel is of wrongful underlying format/type", + ); + } + } + }; + + return Ok(CGWUCentralEvent { + serial, + evt_type: CGWUCentralEventType::RealtimeEvent(CGWUCentralEventRealtimeEvent { + // For client.join the timestamp is (whenever event's + // been received) + timestamp, + evt_type: CGWUCentralEventRealtimeEventType::WirelessClientJoin( + CGWUCentralEventRealtimeEventWClientJoin { + client, + band: band.to_string(), + ssid: ssid.to_string(), + rssi, + channel, + }, + ), + }), + }); + } + "client.leave" => { + if !evt_payload.contains_key("band") + || !evt_payload.contains_key("client") + || !evt_payload.contains_key("connected_time") + { + warn!("Received malformed client.leave event: client, band and connected_time is required"); + return Err("Received malformed client.leave event: client, band and connected_time is required"); + } + + let band = { + match &evt_payload["band"] { + Value::String(s) => s, + _ => { + warn!("Received malformed client.leave event: band is of wrongful underlying format/type"); + return Err( + "Received malformed client.leave event: band is of wrongful underlying format/type", + ); + } + } + }; + let client = { + match &evt_payload["client"] { + Value::String(s) => match MacAddress::from_str(s.as_str()) { + Ok(v) => v, + Err(_) => { + warn!("Received malformed client.leave event: client is a malformed MAC address"); + return Err( + "Received malformed client.leave event: client is a malformed MAC address", + ); + } + }, + _ => { + warn!("Received malformed client.leave event: client is of wrongful underlying format/type"); + return Err( + "Received malformed client.leave event: client is of wrongful underlying format/type", + ); + } + } + }; + let connected_time = { + match &evt_payload["connected_time"] { + Value::Number(s) => match s.as_i64() { + Some(v) => v, + None => { + warn!("Received malformed client.leave event: connected_time is NaN?"); + return Err( + "Received malformed client.leave event: connected_time is NaN?", + ); + } + }, + _ => { + warn!("Received malformed client.leave event: connected_time is of wrongful underlying format/type"); + return Err( + "Received malformed client.leave event: connected_time is of wrongful underlying format/type", + ); + } + } + }; + return Ok(CGWUCentralEvent { + serial, + evt_type: CGWUCentralEventType::RealtimeEvent(CGWUCentralEventRealtimeEvent { + // For client.leave the timestamp is: substracted with + // the time device's been connected to the AP, + // which translates into: + // timestamp is equal to whenever connection's been + // established originally. + // And in case if client.join/state message + // is already registered within the CGW, + // this event will be simply dropped. + timestamp: timestamp - connected_time, + evt_type: CGWUCentralEventRealtimeEventType::WirelessClientLeave( + CGWUCentralEventRealtimeEventWClientLeave { + client, + band: band.to_string(), + }, + ), + }), + }); + } + _ => { + warn!("Received unknown event: {evt_type}"); + return Err("Received unknown event"); + } + } +} + +pub fn cgw_ucentral_ap_parse_message( + message: &String, + timestamp: i64, +) -> Result { + let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) { Ok(m) => m, Err(e) => { error!("Failed to parse input json {e}"); @@ -96,63 +728,9 @@ pub fn cgw_ucentral_ap_parse_message(message: &String) -> Result d, - Err(e) => { - warn!("Failed to decode base64+zip state evt {e}"); - return Err("Failed to decode base64+zip state evt"); - } - }; - let mut d = ZlibDecoder::new(&decoded_data[..]); - let mut unzipped_data = String::new(); - if let Err(e) = d.read_to_string(&mut unzipped_data) { - warn!("Failed to decompress decrypted state message {e}"); - return Err("Failed to decompress decrypted state message"); - } - - let state_map: CGWUcentralJRPCMessage = match serde_json::from_str(&unzipped_data) { - Ok(m) => m, - Err(e) => { - error!("Failed to parse input state message {e}"); - return Err("Failed to parse input state message"); - } - }; - - let serial = MacAddress::from_str(state_map["serial"].as_str().unwrap()).unwrap(); - - if let Value::Object(state_map) = &state_map["state"] { - let state_event = CGWUCentralEvent { - serial, - evt_type: CGWUCentralEventType::State(CGWUCentralEventState { - lldp_data: CGWUCentralEventStateLLDPData { - local_mac: serial, - links: parse_lldp_data(&state_map["lldp-peers"]), - }, - }), - }; - - return Ok(state_event); - } - - return Err("Parsed, decompressed state message but failed to find state object"); - } else if let Value::Object(state_map) = ¶ms["state"] { - let serial = MacAddress::from_str(state_map["serial"].as_str().unwrap()).unwrap(); - - let state_event = CGWUCentralEvent { - serial, - evt_type: CGWUCentralEventType::State(CGWUCentralEventState { - lldp_data: CGWUCentralEventStateLLDPData { - local_mac: serial, - links: parse_lldp_data(&state_map["lldp-peers"]), - }, - }), - }; - - return Ok(state_event); - } + return parse_state_event_data(map, timestamp); + } else if method == "event" { + return parse_realtime_event_data(map, timestamp); } } else if map.contains_key("result") { if !map.contains_key("id") { diff --git a/src/cgw_ucentral_parser.rs b/src/cgw_ucentral_parser.rs index 7134e10..b132677 100644 --- a/src/cgw_ucentral_parser.rs +++ b/src/cgw_ucentral_parser.rs @@ -11,7 +11,7 @@ use crate::{ cgw_ucentral_switch_parser::cgw_ucentral_switch_parse_message, }; -pub type CGWUcentralJRPCMessage = Map; +pub type CGWUCentralJRPCMessage = Map; #[derive(Debug, Default, Deserialize, Serialize, PartialEq)] pub struct CGWUCentralEventLog { @@ -37,29 +37,50 @@ pub struct CGWUCentralEventConnect { } #[derive(Debug, Default, Deserialize, Serialize, PartialEq)] -pub struct CGWUCentralEventStateLLDPDataLinks { +pub struct CGWUCentralEventStateLinks { pub local_port: String, #[serde(skip)] - pub remote_mac: MacAddress, + pub remote_serial: MacAddress, + pub remote_port: String, + pub is_downstream: bool, +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub enum CGWUCentralEventStateClientsType { + Wired(i64), + // Ssid, Band + Wireless(i64, String, String), +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct CGWUCentralEventStateClients { + pub client_type: CGWUCentralEventStateClientsType, + pub local_port: String, + #[serde(skip)] + pub remote_serial: MacAddress, pub remote_port: String, pub is_downstream: bool, } #[derive(Debug, Default, Deserialize, Serialize, PartialEq)] pub struct CGWUCentralEventStateLLDPData { - // Parsed State LLDP data: - // mac address of the device reporting the LLDP data - #[serde(skip)] - pub local_mac: MacAddress, - // links reported by the device: - // local port, remote mac, remote port - pub links: Vec, + pub links: Vec, +} + +#[derive(Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct CGWUCentralEventStateClientsData { + // links reported by the device (wired and wireless): + pub links: Vec, } #[derive(Debug, Default, Deserialize, Serialize, PartialEq)] pub struct CGWUCentralEventState { + // mac address of the device reporting the state evt + pub local_mac: MacAddress, + pub timestamp: i64, pub lldp_data: CGWUCentralEventStateLLDPData, + pub clients_data: CGWUCentralEventStateClientsData, } #[derive(Debug, Default, Deserialize, Serialize, PartialEq)] @@ -67,6 +88,35 @@ pub struct CGWUCentralEventReply { pub id: u64, } +#[derive(Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct CGWUCentralEventRealtimeEventWClientJoin { + pub client: MacAddress, + pub band: String, + pub ssid: String, + pub rssi: i64, + pub channel: u64, +} + +#[derive(Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct CGWUCentralEventRealtimeEventWClientLeave { + pub client: MacAddress, + pub band: String, +} + +#[derive(Debug, Default, Deserialize, Serialize, PartialEq)] +pub enum CGWUCentralEventRealtimeEventType { + WirelessClientJoin(CGWUCentralEventRealtimeEventWClientJoin), + WirelessClientLeave(CGWUCentralEventRealtimeEventWClientLeave), + #[default] + None, +} + +#[derive(Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct CGWUCentralEventRealtimeEvent { + pub evt_type: CGWUCentralEventRealtimeEventType, + pub timestamp: i64, +} + #[derive(Debug, Deserialize, Serialize, PartialEq)] pub enum CGWUCentralEventType { Connect(CGWUCentralEventConnect), @@ -83,6 +133,7 @@ pub enum CGWUCentralEventType { Ping, Recovery, VenueBroadcast, + RealtimeEvent(CGWUCentralEventRealtimeEvent), Reply(CGWUCentralEventReply), } @@ -180,7 +231,7 @@ pub fn cgw_ucentral_parse_connect_event( return Err("Message to string cast failed"); }; - let map: CGWUcentralJRPCMessage = match serde_json::from_str(&msg) { + let map: CGWUCentralJRPCMessage = match serde_json::from_str(&msg) { Ok(m) => m, Err(e) => { error!("Failed to parse input json {e}"); @@ -230,7 +281,7 @@ pub fn cgw_ucentral_parse_connect_event( pub fn cgw_ucentral_parse_command_message( message: &String, ) -> Result { - let map: CGWUcentralJRPCMessage = match serde_json::from_str(message) { + let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) { Ok(m) => m, Err(e) => { error!("Failed to parse input json {e}"); @@ -282,9 +333,10 @@ pub fn cgw_ucentral_parse_command_message( pub fn cgw_ucentral_event_parse( device_type: &CGWDeviceType, message: &String, + timestamp: i64, ) -> Result { match device_type { - CGWDeviceType::CGWDeviceAP => cgw_ucentral_ap_parse_message(&message), - CGWDeviceType::CGWDeviceSwitch => cgw_ucentral_switch_parse_message(&message), + CGWDeviceType::CGWDeviceAP => cgw_ucentral_ap_parse_message(&message, timestamp), + CGWDeviceType::CGWDeviceSwitch => cgw_ucentral_switch_parse_message(&message, timestamp), } } diff --git a/src/cgw_ucentral_switch_parser.rs b/src/cgw_ucentral_switch_parser.rs index ce312a4..4e85943 100644 --- a/src/cgw_ucentral_switch_parser.rs +++ b/src/cgw_ucentral_switch_parser.rs @@ -3,15 +3,13 @@ use serde_json::Value; use std::str::FromStr; use crate::cgw_ucentral_parser::{ - CGWUCentralEvent, CGWUCentralEventLog, CGWUCentralEventState, CGWUCentralEventStateLLDPData, - CGWUCentralEventStateLLDPDataLinks, CGWUCentralEventType, CGWUcentralJRPCMessage, + CGWUCentralEvent, CGWUCentralEventLog, CGWUCentralEventState, CGWUCentralEventStateClientsData, + CGWUCentralEventStateLLDPData, CGWUCentralEventStateLinks, CGWUCentralEventType, + CGWUCentralJRPCMessage, }; -fn parse_lldp_data( - data: &Value, - upstream_port: Option, -) -> Vec { - let mut links: Vec = Vec::new(); +fn parse_lldp_data(data: &Value, upstream_port: Option) -> Vec { + let mut links: Vec = Vec::new(); if let Value::Object(map) = data { let directions = [ @@ -24,7 +22,7 @@ fn parse_lldp_data( let data = value.as_array().unwrap()[0].as_object().unwrap(); let local_port = key.to_string(); - let remote_mac = MacAddress::from_str(data["mac"].as_str().unwrap()).unwrap(); + let remote_serial = MacAddress::from_str(data["mac"].as_str().unwrap()).unwrap(); let remote_port = data["port"].as_str().unwrap().to_string(); let is_downstream: bool = { if let Some(ref port) = upstream_port { @@ -34,9 +32,9 @@ fn parse_lldp_data( } }; - links.push(CGWUCentralEventStateLLDPDataLinks { + links.push(CGWUCentralEventStateLinks { local_port, - remote_mac, + remote_serial, remote_port, is_downstream, }); @@ -49,8 +47,9 @@ fn parse_lldp_data( pub fn cgw_ucentral_switch_parse_message( message: &String, + timestamp: i64, ) -> Result { - let map: CGWUcentralJRPCMessage = match serde_json::from_str(message) { + let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) { Ok(m) => m, Err(e) => { error!("Failed to parse input json {e}"); @@ -90,10 +89,12 @@ pub fn cgw_ucentral_switch_parse_message( if let Value::Object(state_map) = ¶ms["state"] { let serial = MacAddress::from_str(params["serial"].as_str().unwrap()).unwrap(); let mut upstream_port: Option = None; - if let Value::Array(default_gw) = &state_map["default-gateway"] { - if let Some(gw) = default_gw.get(0) { - if let Value::String(port) = &gw["out-port"] { - upstream_port = Some(port.as_str().to_string()); + if state_map.contains_key("default-gateway") { + if let Value::Array(default_gw) = &state_map["default-gateway"] { + if let Some(gw) = default_gw.get(0) { + if let Value::String(port) = &gw["out-port"] { + upstream_port = Some(port.as_str().to_string()); + } } } } @@ -101,10 +102,12 @@ pub fn cgw_ucentral_switch_parse_message( let state_event = CGWUCentralEvent { serial, evt_type: CGWUCentralEventType::State(CGWUCentralEventState { + timestamp, + local_mac: serial, lldp_data: CGWUCentralEventStateLLDPData { - local_mac: serial, links: parse_lldp_data(&state_map["lldp-peers"], upstream_port), }, + clients_data: CGWUCentralEventStateClientsData { links: Vec::new() }, }), }; diff --git a/src/cgw_ucentral_topology_map.rs b/src/cgw_ucentral_topology_map.rs index 1e7f10c..bd13864 100644 --- a/src/cgw_ucentral_topology_map.rs +++ b/src/cgw_ucentral_topology_map.rs @@ -1,6 +1,9 @@ use crate::{ cgw_device::CGWDeviceType, - cgw_ucentral_parser::{CGWUCentralEvent, CGWUCentralEventType}, + cgw_ucentral_parser::{ + CGWUCentralEvent, CGWUCentralEventRealtimeEventType, CGWUCentralEventStateClientsType, + CGWUCentralEventType, + }, }; use petgraph::dot::{Config, Dot}; use petgraph::{ @@ -15,34 +18,83 @@ use tokio::sync::RwLock; use eui48::MacAddress; +type WirelessClientBand = String; +type WirelessClientSsid = String; + // One 'slice' / part of edge (Mac + port); // To make a proper complete edge two parts needed: // SRC -> DST #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct CGWUcentralTopologySubEdge { - pub mac: MacAddress, - pub port: String, +pub enum CGWUCentralTopologySubEdgePort { + // Used in subedge + PhysicalWiredPort(String), + WirelessPort, + + // Used in subedge + // Wired client reported by AP (no dst port info available) + // TODO: Duplex speed? + WiredClient, + // Wieless client reported by AP: SSID + Band + WirelessClient(WirelessClientSsid, WirelessClientBand), +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct CGWUCentralTopologySubEdge { + pub serial: MacAddress, + pub port: CGWUCentralTopologySubEdgePort, } // Complete edge consisting of SRC -> DST 'sub-edges' #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct CGWUcentralTopologyEdge(CGWUcentralTopologySubEdge, CGWUcentralTopologySubEdge); +pub struct CGWUCentralTopologyEdge(CGWUCentralTopologySubEdge, CGWUCentralTopologySubEdge); -#[derive(Debug)] -struct CGWUcentralTopologyMapData { - node_idx_map: HashMap, - edge_idx_map: HashMap, - graph: StableGraph, +type EdgeCreationTimestamp = i64; + +// We have to track the 'origin' of any node we add to topo map, +// because deletion decision should be made on the following basis: +// - direct WSS connection should be always kept in the topo map, +// and only erased when disconnect happens; +// - any nodes, that are added to topo map as 'clients' (lldp peers, +// wired and wireless clients, fdb info) should be deleted when the node +// that reported them gets deleted; +// however if 'client' node also exists in topo map, and is currently connected +// to CGW (WSS), then it should be left untouched; +#[derive(Debug, Clone)] +enum CGWUCentralTopologyMapNodeOrigin { + UCentralDevice, + StateLLDPPeer, + StateWiredWireless, + StateFDB, +} + +// We have to track the 'origin' of any edge we add to topo map, +// because deletion decision should be made on the following basis: +// - 'client.leave' should remove edge (only if join timestamp < leave timestamp); +// - 'client.join' should remove edge and create (potentially with new SRC node) +// a new edge with device/node that reports this event. +// Only, if join timestamp > join timestamp (state evt, realtime join evt) +#[derive(Debug, Clone)] +enum CGWUCentralTopologyMapEdgeOrigin { + StateLLDPPeer, + StateWiredWireless(EdgeCreationTimestamp), + StateFDB, } #[derive(Debug)] -pub struct CGWUcentralTopologyMap { - data: Arc>, +struct CGWUCentralTopologyMapData { + node_idx_map: HashMap, + edge_idx_map: HashMap, + graph: StableGraph, +} + +#[derive(Debug)] +pub struct CGWUCentralTopologyMap { + data: Arc>, } lazy_static! { - pub static ref CGW_UCENTRAL_TOPOLOGY_MAP: CGWUcentralTopologyMap = CGWUcentralTopologyMap { - data: Arc::new(RwLock::new(CGWUcentralTopologyMapData { + pub static ref CGW_UCENTRAL_TOPOLOGY_MAP: CGWUCentralTopologyMap = CGWUCentralTopologyMap { + data: Arc::new(RwLock::new(CGWUCentralTopologyMapData { node_idx_map: HashMap::new(), edge_idx_map: HashMap::new(), graph: StableGraph::new(), @@ -50,19 +102,23 @@ lazy_static! { }; } -impl CGWUcentralTopologyMap { +impl CGWUCentralTopologyMap { pub fn get_ref() -> &'static Self { &CGW_UCENTRAL_TOPOLOGY_MAP } - pub async fn insert_device(self: &Self, mac: &MacAddress) { + pub async fn insert_device(self: &Self, serial: &MacAddress) { let mut lock = self.data.write().await; - Self::add_node(&mut lock, mac); + Self::add_node( + &mut lock, + serial, + CGWUCentralTopologyMapNodeOrigin::UCentralDevice, + ); } - pub async fn remove_device(self: &Self, mac: &MacAddress) { + pub async fn remove_device(self: &Self, serial: &MacAddress) { let mut lock = self.data.write().await; - Self::remove_node(&mut lock, mac); + Self::remove_node(&mut lock, serial); } pub async fn process_state_message( @@ -73,21 +129,33 @@ impl CGWUcentralTopologyMap { let mut lock = self.data.write().await; if let CGWUCentralEventType::State(s) = evt.evt_type { + // To make sure any leftovers are handled, node that reports + // state message is getting purged and recreated: + // since state message hold necessary information, + // we can safely purge all edge info and recreate it from + // the state message. Any missed / deleted by mistake + // edges will appear on the next iteration of state / realtime event + // processing. + Self::remove_node(&mut lock, &s.local_mac); + + // Re-create node with origin being UCentralDevice, as this + // device is directly connected to the CGW. + Self::add_node( + &mut lock, + &s.local_mac, + CGWUCentralTopologyMapNodeOrigin::UCentralDevice, + ); + // Start with LLDP info processing for link in s.lldp_data.links { - let subedge_src = CGWUcentralTopologySubEdge { - mac: s.lldp_data.local_mac, - port: link.local_port, + let subedge_src = CGWUCentralTopologySubEdge { + serial: s.local_mac, + port: CGWUCentralTopologySubEdgePort::PhysicalWiredPort(link.local_port), }; - let subedge_dst = CGWUcentralTopologySubEdge { - mac: link.remote_mac, - port: link.remote_port, + let subedge_dst = CGWUCentralTopologySubEdge { + serial: link.remote_serial, + port: CGWUCentralTopologySubEdgePort::PhysicalWiredPort(link.remote_port), }; - // Any neighbour seen in LLDP is added to the graph. - // Whenever parent (entity reporting the LLDP data) - // get's removed - neighbour nodes and connected - // edges will be purged. - Self::add_node(&mut lock, &link.remote_mac); // No duplicates can exists, since it's LLDP data // (both uCentral and underlying LLDP agents do not @@ -99,25 +167,311 @@ impl CGWUcentralTopologyMap { Self::remove_edge(&mut lock, &subedge_src); Self::remove_edge(&mut lock, &subedge_dst); + // Any neighbour seen in LLDP is added to the graph. + // Whenever parent (entity reporting the LLDP data) + // get's removed - neighbour nodes and connected + // edges will be purged. + Self::add_node( + &mut lock, + &link.remote_serial, + CGWUCentralTopologyMapNodeOrigin::StateLLDPPeer, + ); + if link.is_downstream { - Self::add_edge(&mut lock, CGWUcentralTopologyEdge(subedge_src, subedge_dst)); + Self::add_edge( + &mut lock, + CGWUCentralTopologyEdge(subedge_src, subedge_dst), + CGWUCentralTopologyMapEdgeOrigin::StateLLDPPeer, + ); } else { - Self::add_edge(&mut lock, CGWUcentralTopologyEdge(subedge_dst, subedge_src)); + Self::add_edge( + &mut lock, + CGWUCentralTopologyEdge(subedge_dst, subedge_src), + CGWUCentralTopologyMapEdgeOrigin::StateLLDPPeer, + ); + } + } + + // Clients data second iteration: + // add all nodes seen in clients; + // add new edges; + for link in &s.clients_data.links { + // Treat state timestamp as edge-creation timestamp only for + // events that do not report explicit connection timestamp + // (no association establishment timestamp for wired clients, + // however present for wireless for example). + let mut link_timestamp = s.timestamp; + let (subedge_src, subedge_dst) = { + if let CGWUCentralEventStateClientsType::Wired(_) = link.client_type { + ( + CGWUCentralTopologySubEdge { + serial: s.local_mac, + port: CGWUCentralTopologySubEdgePort::PhysicalWiredPort( + link.local_port.clone(), + ), + }, + CGWUCentralTopologySubEdge { + serial: link.remote_serial, + // TODO: Duplex speed? + port: CGWUCentralTopologySubEdgePort::WiredClient, + }, + ) + } else if let CGWUCentralEventStateClientsType::Wireless(ts, ssid, band) = + &link.client_type + { + // Since wireless association explicitly reports the + // timestamp for when link's been established, we can + // use this value reported from AP. + // For any other case (LLDP, wired), we use + // the event's base timestamp value; + link_timestamp = *ts; + + ( + CGWUCentralTopologySubEdge { + serial: s.local_mac, + port: CGWUCentralTopologySubEdgePort::WirelessPort, + }, + CGWUCentralTopologySubEdge { + serial: link.remote_serial, + port: CGWUCentralTopologySubEdgePort::WirelessClient( + ssid.clone(), + band.clone(), + ), + }, + ) + } else { + continue; + } + }; + + // In case when client silently migrates from AP1 to AP2, + // we have to explicitly remove that from AP1, + // and 'migrate' it to AP2. + // Do this only using , to make sure + // we clear only unique destination (wifi client on band X, + // for example) edge counterparts. + // NOTE: deleting subedge will remove both SRC and DST + // from map, as map stores them separately. + Self::remove_edge(&mut lock, &subedge_dst); + + Self::add_node( + &mut lock, + &link.remote_serial, + CGWUCentralTopologyMapNodeOrigin::StateWiredWireless, + ); + + if link.is_downstream { + Self::add_edge( + &mut lock, + CGWUCentralTopologyEdge(subedge_src, subedge_dst), + CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless(link_timestamp), + ); + } else { + Self::add_edge( + &mut lock, + CGWUCentralTopologyEdge(subedge_dst, subedge_src), + CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless(link_timestamp), + ); } } } } - pub async fn process_device_topology_event(self: &Self) {} + pub async fn process_device_topology_event( + self: &Self, + _device_type: &CGWDeviceType, + evt: CGWUCentralEvent, + ) { + struct ExistingEdge { + idx: EdgeIndex, + timestamp: EdgeCreationTimestamp, + key: CGWUCentralTopologyEdge, + } + let mut lock = self.data.write().await; + let mut existing_edge: Option = None; - fn add_node(data: &mut CGWUcentralTopologyMapData, node_mac: &MacAddress) -> NodeIndex { - match data.node_idx_map.get(node_mac) { + if let CGWUCentralEventType::RealtimeEvent(rt) = evt.evt_type { + if let CGWUCentralEventRealtimeEventType::WirelessClientJoin(rt_j) = &rt.evt_type { + for key in lock.edge_idx_map.keys() { + // Try to find edge: + // we're looking for an edge with wireless client + // () with specific () properties. + // However, the check is global: + // We do not care AP reported the client initially: + // since the new client can appear on any given AP that + // is connected to us, we have to make sure that if + // AP2 receives client.join, and client serial is already + // associated with AP1, the connection edge between + // AP1 and should be purged, + // and then assigned to AP2. + // + // This only applies, however, to the join message. + // Late-leave events should be ignored, in case if + // client appears on new/other AP. + if let CGWUCentralTopologySubEdgePort::WirelessClient(_, dst_band) = &key.1.port + { + if key.1.serial == rt_j.client && *dst_band == *rt_j.band { + if let Some((edge_idx, edge_origin)) = lock.edge_idx_map.get(key) { + if let CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless( + edge_timestamp, + ) = edge_origin + { + existing_edge = Some(ExistingEdge { + idx: *edge_idx, + timestamp: edge_timestamp.clone(), + key: key.to_owned(), + }); + break; + } + } + } + } + } + if let Some(e) = existing_edge { + // New client joined, and new event timestamp is bigger (newer): + // - delete existing edge from map; + // - update graph; + if rt.timestamp > e.timestamp { + let _ = lock.graph.remove_edge(e.idx); + + // Remove SRC (tuple idx 0 == src) -> DST (idx 1 == dst) edge + let mut edge = CGWUCentralTopologyEdge(e.key.0, e.key.1); + let _ = lock.edge_idx_map.remove(&edge); + + // We do not delete the leaf-disconnected bode, + // as we will try to recreate it later on anyways. + + // Remove DST (tuple idx 1 == dst) -> SRC (idx 0 == src) edge + edge = CGWUCentralTopologyEdge(edge.1, edge.0); + let _ = lock.edge_idx_map.remove(&edge); + } else { + warn!( + "Received late join event: event ts {:?} vs existing edge ts {:?}", + rt.timestamp, e.timestamp + ); + // New event is a late-reported / processed event; + // We can safely skip it; + return; + } + } + + // Now simply update internall state: + // - create node (if doesnt exist already) + // - create edge; + // - update graph; + Self::add_node( + &mut lock, + &rt_j.client, + CGWUCentralTopologyMapNodeOrigin::StateWiredWireless, + ); + + let (subedge_src, subedge_dst) = { + ( + CGWUCentralTopologySubEdge { + serial: evt.serial.clone(), + port: CGWUCentralTopologySubEdgePort::WirelessPort, + }, + CGWUCentralTopologySubEdge { + serial: rt_j.client, + port: CGWUCentralTopologySubEdgePort::WirelessClient( + rt_j.ssid.clone(), + rt_j.band.clone(), + ), + }, + ) + }; + Self::add_edge( + &mut lock, + CGWUCentralTopologyEdge(subedge_src, subedge_dst), + CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless(rt.timestamp), + ); + } else if let CGWUCentralEventRealtimeEventType::WirelessClientLeave(rt_l) = rt.evt_type + { + for key in lock.edge_idx_map.keys() { + // Try to find edge: + // we're looking for an edge with wireless client + // () with specific () properties, which is also + // reported by the AP, as it's a leave event + // (AP1 can't expect us to delete existing edge, if AP2 + // is already associated with this client) + if let CGWUCentralTopologySubEdgePort::WirelessClient(_, dst_band) = &key.1.port + { + if key.1.serial == rt_l.client && *dst_band == *rt_l.band && + // Part that checks if AP that reports also + // is associated with this client. + // If not - it's a 'late' leave event that can be ignored. + key.0.serial == evt.serial + { + if let Some((edge_idx, edge_origin)) = lock.edge_idx_map.get(key) { + if let CGWUCentralTopologyMapEdgeOrigin::StateWiredWireless( + edge_timestamp, + ) = edge_origin + { + existing_edge = Some(ExistingEdge { + idx: *edge_idx, + timestamp: edge_timestamp.clone(), + key: key.to_owned(), + }); + break; + } + } + } + } + } + if let Some(e) = existing_edge { + // We still have to check whether this leave message + // is newer than the existing timestamp: + // It's possible that state + leave events were shuffled, + // in a way that leave gets processed only after state's + // processing's been completed. + // This results in a discardtion of the late leave event. + if rt.timestamp > e.timestamp { + let _ = lock.graph.remove_edge(e.idx); + + // Remove SRC (tuple idx 0 == src) -> DST (idx 1 == dst) edge + let mut edge = CGWUCentralTopologyEdge(e.key.0, e.key.1); + let _ = lock.edge_idx_map.remove(&edge); + + // Also remove dst node if it's a leaf-disconnected node + Self::remove_disconnected_leaf_node(&mut lock, &edge.1.serial); + + // Remove DST (tuple idx 1 == dst) -> SRC (idx 0 == src) edge + edge = CGWUCentralTopologyEdge(edge.1, edge.0); + let _ = lock.edge_idx_map.remove(&edge); + } else { + warn!( + "Received late leave event: event ts {:?} vs existing edge ts {:?}", + rt.timestamp, e.timestamp + ); + // New event is a late-reported / processed event; + // We can safely skip it; + return; + } + } + } + } + } + + fn add_node( + data: &mut CGWUCentralTopologyMapData, + node_mac: &MacAddress, + origin: CGWUCentralTopologyMapNodeOrigin, + ) -> NodeIndex { + match data.node_idx_map.get_mut(node_mac) { None => { - let idx = data.graph.add_node(node_mac.to_hex_string()); - let _ = data.node_idx_map.insert(node_mac.clone(), idx); + let idx = data.graph.add_node(*node_mac); + let _ = data.node_idx_map.insert(node_mac.clone(), (idx, origin)); idx } - Some(idx) => *idx, + + Some((idx, existing_origin)) => { + if let CGWUCentralTopologyMapNodeOrigin::UCentralDevice = existing_origin { + *idx + } else { + *existing_origin = origin; + *idx + } + } } // TODO: handle case: // this either means that we detected this node at some new @@ -125,11 +479,56 @@ impl CGWUcentralTopologyMap { // e.g. delete all connected edges, child nodes etc etc; } - fn remove_node(data: &mut CGWUcentralTopologyMapData, node_mac: &MacAddress) { - if let Some(node) = data.node_idx_map.remove(node_mac) { + // Checks before removal, safe to call + fn remove_disconnected_leaf_node(data: &mut CGWUCentralTopologyMapData, node_mac: &MacAddress) { + let mut node_idx_to_remove: Option = None; + + if let Some((node_idx, origin)) = data.node_idx_map.get(node_mac) { + // Skip this node, as it's origin is known from + // uCentral connection, not state data. + if let CGWUCentralTopologyMapNodeOrigin::UCentralDevice = origin { + debug!("Not removing disconnected leaf {:?} - reason: uCentral device (direct connection to CGW)", node_mac); + return; + } + + let mut edges = data + .graph + // We're interested only if there are edges for + // this (potentially) disconnected leaf-node + .neighbors_directed(*node_idx, Direction::Incoming) + .detach(); + + if let None = edges.next_edge(&data.graph) { + node_idx_to_remove = Some(*node_idx); + } + } + + if let Some(node_idx) = node_idx_to_remove { + debug!("MAC {:?} is a disconnected leaf node, removing", node_mac); + data.node_idx_map.remove(node_mac); + data.graph.remove_node(node_idx); + } + } + + fn remove_node(data: &mut CGWUCentralTopologyMapData, node_mac: &MacAddress) { + if let Some((node, _)) = data.node_idx_map.remove(node_mac) { + // 'Potential' list of nodes we can safely remove. + // Duplicates may exist, because multiple edges can originate + // from src node (AP, switch) to a single other node + // (for example client's connected both through + // the WiFi and the cable, or client's connected + // to the AP on multiple bands etc). + // + // Not every node from this list gets removed, as + // once again: node (client) can be connected to + // multiple APs at once on different bands, + // or client's seen for example both on WiFi + // and cable. + let mut nodes_to_remove: Vec = Vec::new(); + let mut edges_to_remove: Vec = Vec::new(); - let mut map_keys_to_remove: Vec = Vec::new(); - let edges = [ + let mut map_edge_keys_to_remove: Vec = Vec::new(); + let mut edges = [ data.graph .neighbors_directed(node, Direction::Outgoing) .detach(), @@ -138,50 +537,138 @@ impl CGWUcentralTopologyMap { .detach(), ]; - for mut x in edges { - while let Some(edge) = x.next_edge(&data.graph) { - data.graph.remove_edge(edge); - edges_to_remove.push(edge); + while let Some(edge) = edges[0].next_edge(&data.graph) { + // We iterate over edges that are connected with this SRC + // node, and collect all the destination Node indexes, + // to check them afterwards whether they still have + // some edges connected to them. + // If not - we remove the nodes out off the internal map. + // NOTE: It's possible that two Websocket devices are + // connected and we'll try to remove DST node even though + // knowledge about this device's presence in our map + // originates from WSS connection, not state message. + // However, internal map also has meta information + // about the origin of appearence in map, hence + // it solves the issue. + // (if node.origin == WSS then ) + // + // NOTE: we do this only for neighbors + // From treeview-graph perspective, we're clearing + // nodes that originate from this device. + if let Some((_, node_dst)) = data.graph.edge_endpoints(edge) { + nodes_to_remove.push(node_dst); } + + data.graph.remove_edge(edge); + edges_to_remove.push(edge); } - for (k, e) in &data.edge_idx_map { - for x in &edges_to_remove { - if x == e { - map_keys_to_remove.push(k.to_owned()); + + while let Some(edge) = edges[1].next_edge(&data.graph) { + data.graph.remove_edge(edge); + edges_to_remove.push(edge); + } + + for node_idx in nodes_to_remove { + let mut node_edges = data + .graph + .neighbors_directed(node_idx, Direction::Incoming) + .detach(); + + // Check if at least one edge is connecting this + // Node; If not - purge it, but only if this node + // has been added through the means of State message + // or realtime events. + // + // If it's an active WSS connection we have established, + // we should skip this node, as it's not our responsibility + // here to destroy it. + if let None = node_edges.next_edge(&data.graph) { + let mut node_to_remove: Option<&MacAddress> = None; + if let Some(node_weight) = data.graph.node_weight(node_idx) { + if let Some((_, origin)) = data.node_idx_map.get(&node_weight) { + // Skip this node, as it's origin is known from + // uCentral connection, not state data. + if let CGWUCentralTopologyMapNodeOrigin::UCentralDevice = origin { + debug!("Not removing disconnected leaf {:?} - reason: uCentral device (direct connection to CGW)", node_weight); + continue; + } + + node_to_remove = Some(node_weight); + } + } + + if let Some(node_mac) = node_to_remove { + data.node_idx_map.remove(node_mac); + data.graph.remove_node(node_idx); } } } - for x in map_keys_to_remove { - data.edge_idx_map.remove(&x); + + for (k, e) in &data.edge_idx_map { + for x in &edges_to_remove { + if *x == e.0 { + map_edge_keys_to_remove.push(k.to_owned()); + } + } + } + + for key in map_edge_keys_to_remove { + data.edge_idx_map.remove(&key); } data.graph.remove_node(node); } } - fn add_edge(data: &mut CGWUcentralTopologyMapData, edge: CGWUcentralTopologyEdge) { - let node_src_subedge: CGWUcentralTopologySubEdge = edge.0; - let node_dst_subedge: CGWUcentralTopologySubEdge = edge.1; - let node_src_idx = Self::add_node(data, &node_src_subedge.mac); - let node_dst_idx = Self::add_node(data, &node_dst_subedge.mac); + fn add_edge( + data: &mut CGWUCentralTopologyMapData, + edge: CGWUCentralTopologyEdge, + origin: CGWUCentralTopologyMapEdgeOrigin, + ) { + let node_src_subedge: CGWUCentralTopologySubEdge = edge.0; + let node_dst_subedge: CGWUCentralTopologySubEdge = edge.1; + let (node_src_idx, node_dst_idx) = { + ( + match data.node_idx_map.get(&node_src_subedge.serial) { + Some((idx, _)) => *idx, + None => { + warn!( + "Tried to add edge for non-existing node {:?}", + node_src_subedge.serial + ); + return; + } + }, + match data.node_idx_map.get(&node_dst_subedge.serial) { + Some((idx, _)) => *idx, + None => { + warn!( + "Tried to add edge for non-existing node {:?}", + node_dst_subedge.serial + ); + return; + } + }, + ) + }; let edge_idx = data.graph.add_edge( node_src_idx, node_dst_idx, - format!("{}<->{}", node_src_subedge.port, node_dst_subedge.port), + format!("{:?}<->{:?}", node_src_subedge.port, node_dst_subedge.port), ); data.edge_idx_map.insert( - CGWUcentralTopologyEdge(node_src_subedge.clone(), node_dst_subedge.clone()), - edge_idx, + CGWUCentralTopologyEdge(node_src_subedge.clone(), node_dst_subedge.clone()), + (edge_idx, origin.clone()), ); data.edge_idx_map.insert( - CGWUcentralTopologyEdge(node_dst_subedge, node_src_subedge), - edge_idx, + CGWUCentralTopologyEdge(node_dst_subedge, node_src_subedge), + (edge_idx, origin.clone()), ); } - fn remove_edge(data: &mut CGWUcentralTopologyMapData, subedge: &CGWUcentralTopologySubEdge) { - let mut keys_to_remove: Vec = Vec::new(); + fn remove_edge(data: &mut CGWUCentralTopologyMapData, subedge: &CGWUCentralTopologySubEdge) { + let mut keys_to_remove: Vec = Vec::new(); for key in data.edge_idx_map.keys() { if key.0 == *subedge || key.1 == *subedge { @@ -191,8 +678,8 @@ impl CGWUcentralTopologyMap { } if let Some(key) = keys_to_remove.get(0) { - if let Some(v) = data.edge_idx_map.get(key) { - data.graph.remove_edge(*v); + if let Some((edge_idx, _)) = data.edge_idx_map.get(key) { + data.graph.remove_edge(*edge_idx); } } @@ -211,7 +698,13 @@ impl CGWUcentralTopologyMap { &|_, er| { format!("label = \"{}\"", er.weight()) }, &|_, nr| { format!("label = \"{}\" shape=\"record\"", nr.weight()) } ) + ) + .replace("digraph {", "digraph {\n\trankdir=LR;\n"); + debug!( + "graph dump: {} {}\n{}", + lock.node_idx_map.len(), + lock.edge_idx_map.len(), + dotfmt ); - info!("graph dump:\n{}", dotfmt); } } diff --git a/utils/cert_generator/generate_certs.sh b/utils/cert_generator/generate_certs.sh index 09370b1..9512dfe 100755 --- a/utils/cert_generator/generate_certs.sh +++ b/utils/cert_generator/generate_certs.sh @@ -50,10 +50,10 @@ gen_cert() local key=$3 local cert=$4 # generate key and request to sign - openssl req -config $CONF_FILE -x509 -nodes -newkey rsa:4096 -sha512 -days 365 \ + openssl req -config $CONF_FILE -x509 -nodes -newkey rsa:4096 -sha256 -days 365 \ -extensions $type -subj "/CN=$cn" -out $req_file -keyout $key &> /dev/null # sign certificate - openssl x509 -extfile $CONF_FILE -CA $CA_CERT -CAkey $CA_KEY -CAcreateserial -sha512 -days 365 \ + openssl x509 -extfile $CONF_FILE -CA $CA_CERT -CAkey $CA_KEY -CAcreateserial -sha256 -days 365 \ -in $req_file -out $pem if [ $? == "0" ] then @@ -133,7 +133,7 @@ if [ "$GEN_CA" == "y" ] then echo Generating root CA certificate mkdir -p $CA_DIR - openssl req -config $CONF_FILE -x509 -nodes -newkey rsa:4096 -sha512 -days 365 \ + openssl req -config $CONF_FILE -x509 -nodes -newkey rsa:4096 -sha256 -days 365 \ -extensions ca -subj "/CN=CA" -out $CA_CERT -keyout $CA_KEY &> /dev/null fi