From dd909adeb846a6e609aba8a651f1e43dd6b56874 Mon Sep 17 00:00:00 2001 From: Toni Uhlig Date: Sat, 3 May 2025 15:22:57 +0200 Subject: [PATCH] rs-simple: add flow mgmt w/ TTL hash maps (moka-future) Signed-off-by: Toni Uhlig --- examples/rs-simple/Cargo.toml | 14 +++-- examples/rs-simple/src/main.rs | 111 +++++++++++++++++++++++++++------ 2 files changed, 102 insertions(+), 23 deletions(-) diff --git a/examples/rs-simple/Cargo.toml b/examples/rs-simple/Cargo.toml index a2e242a02..b5e0eebb5 100644 --- a/examples/rs-simple/Cargo.toml +++ b/examples/rs-simple/Cargo.toml @@ -6,9 +6,15 @@ edition = "2024" [dependencies] bytes = "1" -tokio = { version = "1", features = ["full"] } -serde = { version = "1", features = ["derive"] } -serde_json = "1.0" -tui = "0.19.0" crossterm = "0.29.0" io = "0.0.2" +moka = { version = "0.12.10", features = ["future"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["full"] } +tui = "0.19.0" + +[profile.release] +strip = true +lto = true +codegen-units = 1 diff --git a/examples/rs-simple/src/main.rs b/examples/rs-simple/src/main.rs index a326dc377..1647c5c70 100644 --- a/examples/rs-simple/src/main.rs +++ b/examples/rs-simple/src/main.rs @@ -1,25 +1,30 @@ use bytes::BytesMut; use crossterm::{ + cursor, + event::{self, KeyCode, KeyEvent}, ExecutableCommand, terminal::{self, ClearType}, - event::{self, KeyCode, KeyEvent}, - cursor, }; +use moka::{future::Cache, Expiry}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::{io::self, sync::Arc}; +use std::{ + hash::{Hash, Hasher}, + io::self, + sync::Arc, + time::{Duration, Instant}, +}; use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio::sync::Mutex; use tokio::sync::MutexGuard; use tokio::net::TcpStream; -use tokio::time::Duration; use tui::{ backend::CrosstermBackend, - Terminal, - widgets::{Block, Borders, List, ListItem}, layout::{Layout, Constraint, Direction}, style::{Style, Color}, + Terminal, + widgets::{Block, Borders, List, ListItem}, }; #[derive(Debug)] @@ -46,6 +51,12 @@ enum EventName { NotDetected, } +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "lowercase")] +enum State { + Info, Finished, +} + #[derive(Serialize, Deserialize, Debug)] struct FlowEvent { #[serde(rename = "flow_event_name")] @@ -53,7 +64,9 @@ struct FlowEvent { #[serde(rename = "flow_id")] id: u64, #[serde(rename = "flow_state")] - state: String, + state: State, + #[serde(rename = "flow_idle_time")] + idle_time: u64, #[serde(rename = "flow_src_packets_processed")] src_packets_processed: u64, #[serde(rename = "flow_dst_packets_processed")] @@ -66,7 +79,6 @@ struct FlowEvent { #[derive(Serialize, Deserialize, Debug)] struct PacketEvent { - packet_event_name: String, pkt_datalink: u16, pkt_caplen: u64, pkt_len: u64, @@ -77,11 +89,13 @@ struct PacketEvent { enum EventType { Flow(FlowEvent), Packet(PacketEvent), + Other(), } #[derive(Default)] struct Stats { ui_updates: u64, + flow_count: u64, parse_errors: u64, events: u64, flow_events: u64, @@ -91,6 +105,53 @@ struct Stats { total_l4_len: u64, } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum Expiration { + FlowIdleTime(u64), +} + +struct FlowExpiry; + +#[derive(Eq, Default, Debug)] +struct FlowKey { + id: u64, +} + +#[derive(Clone, Default, Debug)] +struct FlowValue { +} + +impl Expiration { + fn as_duration(&self) -> Option { + match self { + Expiration::FlowIdleTime(value) => Some(Duration::from_micros(*value)), + } + } +} + +impl Expiry for FlowExpiry { + fn expire_after_create( + &self, + _key: &FlowKey, + value: &(Expiration, FlowValue), + _current_time: Instant, + ) -> Option { + value.0.as_duration() + } +} + +impl Hash for FlowKey { + fn hash(&self, state: &mut H) { + self.id.hash(state) + } +} + +impl PartialEq for FlowKey { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + #[tokio::main] async fn main() { let server_address = "127.0.0.1:7000"; @@ -125,14 +186,19 @@ async fn main() { let data_rx = Arc::clone(&data); tokio::spawn(async move { + let expiry = FlowExpiry; + let flow_cache: Cache = Cache::builder() + .expire_after(expiry) + .build(); + while let Some(msg) = rx.recv().await { match parse_json(&msg) { Ok(message) => { let mut data_lock = data_tx.lock().await; data_lock.events += 1; - update_stats(&message, &mut data_lock); + update_stats(&message, &mut data_lock, &flow_cache).await; } - Err(_) => { + Err(_message) => { let mut data_lock = data_tx.lock().await; data_lock.parse_errors += 1; } @@ -169,7 +235,7 @@ async fn main() { }; let mut data_lock = data_rx.lock().await; data_lock.ui_updates += 1; - draw_ui(&mut terminal.as_mut().unwrap(), &data_lock); + draw_ui(terminal.as_mut().unwrap(), &data_lock); } if let Err(e) = terminal.unwrap().backend_mut().execute(cursor::Show) { @@ -208,12 +274,9 @@ fn parse_message(buffer: &mut BytesMut) -> Option { } fn parse_json(data: &str) -> Result { - let first_non_digit = data.find(|c: char| !c.is_digit(10)).unwrap_or(0); + let first_non_digit = data.find(|c: char| !c.is_ascii_digit()).unwrap_or(0); let length_str = &data[0..first_non_digit]; - let length: usize = match length_str.parse() { - Ok(value) => value, - Err(_) => 0 - }; + let length: usize = length_str.parse().unwrap_or(0); if length == 0 { return Err(ParseError::Protocol()); } @@ -226,15 +289,23 @@ fn parse_json(data: &str) -> Result { } else if value.get("packet_event_name").is_some() { let packet_event: PacketEvent = serde_json::from_value(value)?; return Ok(EventType::Packet(packet_event)); + } else if value.get("daemon_event_name").is_some() || + value.get("error_event_name").is_some() { + return Ok(EventType::Other()); } - return Err(ParseError::Schema()); + Err(ParseError::Schema()) } -fn update_stats(event: &EventType, stats: &mut MutexGuard) { +async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cache: &Cache) { match &event { - EventType::Flow(_) => { + EventType::Flow(flow_event) => { stats.flow_events += 1; + stats.flow_count = cache.entry_count(); + + let key = FlowKey { id: flow_event.id }; + let value = FlowValue {}; + cache.insert(key, (Expiration::FlowIdleTime(flow_event.idle_time), value)).await; } EventType::Packet(packet_event) => { stats.packet_events += 1; @@ -242,12 +313,14 @@ fn update_stats(event: &EventType, stats: &mut MutexGuard) { stats.total_len += packet_event.pkt_len; stats.total_l4_len += packet_event.pkt_l4_len; } + EventType::Other() => {} } } fn draw_ui(terminal: &mut Terminal, data: &MutexGuard) { let general_items = vec![ ListItem::new("TUI Updates..: ".to_owned() + &data.ui_updates.to_string()), + ListItem::new("Flow Count...: ".to_owned() + &data.flow_count.to_string()), ListItem::new("Total Events.: ".to_owned() + &data.events.to_string()), ListItem::new("Parse Errors.: ".to_owned() + &data.parse_errors.to_string()), ListItem::new("Flow Events..: ".to_owned() + &data.flow_events.to_string()),