rs-simple: add flow mgmt w/ TTL hash maps (moka-future)

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2025-05-03 15:22:57 +02:00
parent 8848420a72
commit dd909adeb8
2 changed files with 102 additions and 23 deletions

View File

@@ -6,9 +6,15 @@ edition = "2024"
[dependencies]
bytes = "1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
tui = "0.19.0"
crossterm = "0.29.0"
io = "0.0.2"
moka = { version = "0.12.10", features = ["future"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tui = "0.19.0"
[profile.release]
strip = true
lto = true
codegen-units = 1

View File

@@ -1,25 +1,30 @@
use bytes::BytesMut;
use crossterm::{
cursor,
event::{self, KeyCode, KeyEvent},
ExecutableCommand,
terminal::{self, ClearType},
event::{self, KeyCode, KeyEvent},
cursor,
};
use moka::{future::Cache, Expiry};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{io::self, sync::Arc};
use std::{
hash::{Hash, Hasher},
io::self,
sync::Arc,
time::{Duration, Instant},
};
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::sync::MutexGuard;
use tokio::net::TcpStream;
use tokio::time::Duration;
use tui::{
backend::CrosstermBackend,
Terminal,
widgets::{Block, Borders, List, ListItem},
layout::{Layout, Constraint, Direction},
style::{Style, Color},
Terminal,
widgets::{Block, Borders, List, ListItem},
};
#[derive(Debug)]
@@ -46,6 +51,12 @@ enum EventName {
NotDetected,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "lowercase")]
enum State {
Info, Finished,
}
#[derive(Serialize, Deserialize, Debug)]
struct FlowEvent {
#[serde(rename = "flow_event_name")]
@@ -53,7 +64,9 @@ struct FlowEvent {
#[serde(rename = "flow_id")]
id: u64,
#[serde(rename = "flow_state")]
state: String,
state: State,
#[serde(rename = "flow_idle_time")]
idle_time: u64,
#[serde(rename = "flow_src_packets_processed")]
src_packets_processed: u64,
#[serde(rename = "flow_dst_packets_processed")]
@@ -66,7 +79,6 @@ struct FlowEvent {
#[derive(Serialize, Deserialize, Debug)]
struct PacketEvent {
packet_event_name: String,
pkt_datalink: u16,
pkt_caplen: u64,
pkt_len: u64,
@@ -77,11 +89,13 @@ struct PacketEvent {
enum EventType {
Flow(FlowEvent),
Packet(PacketEvent),
Other(),
}
#[derive(Default)]
struct Stats {
ui_updates: u64,
flow_count: u64,
parse_errors: u64,
events: u64,
flow_events: u64,
@@ -91,6 +105,53 @@ struct Stats {
total_l4_len: u64,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Expiration {
FlowIdleTime(u64),
}
struct FlowExpiry;
#[derive(Eq, Default, Debug)]
struct FlowKey {
id: u64,
}
#[derive(Clone, Default, Debug)]
struct FlowValue {
}
impl Expiration {
fn as_duration(&self) -> Option<Duration> {
match self {
Expiration::FlowIdleTime(value) => Some(Duration::from_micros(*value)),
}
}
}
impl Expiry<FlowKey, (Expiration, FlowValue)> for FlowExpiry {
fn expire_after_create(
&self,
_key: &FlowKey,
value: &(Expiration, FlowValue),
_current_time: Instant,
) -> Option<Duration> {
value.0.as_duration()
}
}
impl Hash for FlowKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state)
}
}
impl PartialEq for FlowKey {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
#[tokio::main]
async fn main() {
let server_address = "127.0.0.1:7000";
@@ -125,14 +186,19 @@ async fn main() {
let data_rx = Arc::clone(&data);
tokio::spawn(async move {
let expiry = FlowExpiry;
let flow_cache: Cache<FlowKey, (Expiration, FlowValue)> = Cache::builder()
.expire_after(expiry)
.build();
while let Some(msg) = rx.recv().await {
match parse_json(&msg) {
Ok(message) => {
let mut data_lock = data_tx.lock().await;
data_lock.events += 1;
update_stats(&message, &mut data_lock);
update_stats(&message, &mut data_lock, &flow_cache).await;
}
Err(_) => {
Err(_message) => {
let mut data_lock = data_tx.lock().await;
data_lock.parse_errors += 1;
}
@@ -169,7 +235,7 @@ async fn main() {
};
let mut data_lock = data_rx.lock().await;
data_lock.ui_updates += 1;
draw_ui(&mut terminal.as_mut().unwrap(), &data_lock);
draw_ui(terminal.as_mut().unwrap(), &data_lock);
}
if let Err(e) = terminal.unwrap().backend_mut().execute(cursor::Show) {
@@ -208,12 +274,9 @@ fn parse_message(buffer: &mut BytesMut) -> Option<String> {
}
fn parse_json(data: &str) -> Result<EventType, ParseError> {
let first_non_digit = data.find(|c: char| !c.is_digit(10)).unwrap_or(0);
let first_non_digit = data.find(|c: char| !c.is_ascii_digit()).unwrap_or(0);
let length_str = &data[0..first_non_digit];
let length: usize = match length_str.parse() {
Ok(value) => value,
Err(_) => 0
};
let length: usize = length_str.parse().unwrap_or(0);
if length == 0 {
return Err(ParseError::Protocol());
}
@@ -226,15 +289,23 @@ fn parse_json(data: &str) -> Result<EventType, ParseError> {
} else if value.get("packet_event_name").is_some() {
let packet_event: PacketEvent = serde_json::from_value(value)?;
return Ok(EventType::Packet(packet_event));
} else if value.get("daemon_event_name").is_some() ||
value.get("error_event_name").is_some() {
return Ok(EventType::Other());
}
return Err(ParseError::Schema());
Err(ParseError::Schema())
}
fn update_stats(event: &EventType, stats: &mut MutexGuard<Stats>) {
async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cache: &Cache<FlowKey, (Expiration, FlowValue)>) {
match &event {
EventType::Flow(_) => {
EventType::Flow(flow_event) => {
stats.flow_events += 1;
stats.flow_count = cache.entry_count();
let key = FlowKey { id: flow_event.id };
let value = FlowValue {};
cache.insert(key, (Expiration::FlowIdleTime(flow_event.idle_time), value)).await;
}
EventType::Packet(packet_event) => {
stats.packet_events += 1;
@@ -242,12 +313,14 @@ fn update_stats(event: &EventType, stats: &mut MutexGuard<Stats>) {
stats.total_len += packet_event.pkt_len;
stats.total_l4_len += packet_event.pkt_l4_len;
}
EventType::Other() => {}
}
}
fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, data: &MutexGuard<Stats>) {
let general_items = vec![
ListItem::new("TUI Updates..: ".to_owned() + &data.ui_updates.to_string()),
ListItem::new("Flow Count...: ".to_owned() + &data.flow_count.to_string()),
ListItem::new("Total Events.: ".to_owned() + &data.events.to_string()),
ListItem::new("Parse Errors.: ".to_owned() + &data.parse_errors.to_string()),
ListItem::new("Flow Events..: ".to_owned() + &data.flow_events.to_string()),