rs-simple: added DaemonEventStatus deserialization and statistics mgmt

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2025-05-16 17:48:51 +02:00
parent ae6864d4e4
commit 46ef266139

View File

@@ -9,6 +9,7 @@ use moka::{future::Cache, Expiry};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::HashMap,
fmt,
hash::{Hash, Hasher},
io::self,
@@ -58,12 +59,32 @@ enum State {
Unknown, Info, Finished,
}
#[derive(Serialize, Deserialize, Debug)]
struct FlowEventNdpiFlowRisk {
#[serde(rename = "risk")]
risk: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct FlowEventNdpi {
#[serde(rename = "proto")]
proto: String,
#[serde(rename = "flow_risk")]
risks: Option<HashMap<String, FlowEventNdpiFlowRisk>>,
}
#[derive(Serialize, Deserialize, Debug)]
struct FlowEvent {
#[serde(rename = "flow_event_name")]
name: EventName,
#[serde(rename = "flow_id")]
id: u64,
#[serde(rename = "alias")]
alias: String,
#[serde(rename = "source")]
source: String,
#[serde(rename = "thread_id")]
thread_id: u64,
#[serde(rename = "flow_state")]
state: State,
#[serde(rename = "flow_first_seen")]
@@ -82,6 +103,12 @@ struct FlowEvent {
src_tot_l4_payload_len: u64,
#[serde(rename = "flow_dst_tot_l4_payload_len")]
dst_tot_l4_payload_len: u64,
#[serde(rename = "l3_proto")]
l3_proto: String,
#[serde(rename = "l4_proto")]
l4_proto: String,
#[serde(rename = "ndpi")]
ndpi: Option<FlowEventNdpi>,
}
#[derive(Serialize, Deserialize, Debug)]
@@ -92,10 +119,61 @@ struct PacketEvent {
pkt_l4_len: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct DaemonEventStatus {
#[serde(rename = "alias")]
alias: String,
#[serde(rename = "source")]
source: String,
#[serde(rename = "thread_id")]
thread_id: u64,
#[serde(rename = "packets-captured")]
packets_captured: u64,
#[serde(rename = "packets-processed")]
packets_processed: u64,
#[serde(rename = "total-skipped-flows")]
total_skipped_flows: u64,
#[serde(rename = "total-l4-payload-len")]
total_l4_payload_len: u64,
#[serde(rename = "total-not-detected-flows")]
total_not_detected_flows: u64,
#[serde(rename = "total-guessed-flows")]
total_guessed_flows: u64,
#[serde(rename = "total-detected-flows")]
total_detected_flows: u64,
#[serde(rename = "total-detection-updates")]
total_detection_updates: u64,
#[serde(rename = "total-updates")]
total_updates: u64,
#[serde(rename = "current-active-flows")]
current_active_flows: u64,
#[serde(rename = "total-active-flows")]
total_active_flows: u64,
#[serde(rename = "total-idle-flows")]
total_idle_flows: u64,
#[serde(rename = "total-compressions")]
total_compressions: u64,
#[serde(rename = "total-compression-diff")]
total_compression_diff: u64,
#[serde(rename = "current-compression-diff")]
current_compression_diff: u64,
#[serde(rename = "global-alloc-bytes")]
global_alloc_bytes: u64,
#[serde(rename = "global-alloc-count")]
global_alloc_count: u64,
#[serde(rename = "global-free-bytes")]
global_free_bytes: u64,
#[serde(rename = "global-free-count")]
global_free_count: u64,
#[serde(rename = "total-events-serialized")]
total_events_serialized: u64,
}
#[derive(Debug)]
enum EventType {
Flow(FlowEvent),
Packet(PacketEvent),
DaemonStatus(DaemonEventStatus),
Other(),
}
@@ -107,14 +185,30 @@ struct Stats {
events: u64,
flow_events: u64,
packet_events: u64,
total_caplen: u64,
total_len: u64,
total_l4_len: u64,
daemon_events: u64,
packet_events_total_caplen: u64,
packet_events_total_len: u64,
packet_events_total_l4_len: u64,
packets_captured: u64,
packets_processed: u64,
flows_total_skipped: u64,
flows_total_l4_payload_len: u64,
flows_total_not_detected: u64,
flows_total_guessed: u64,
flows_current_active: u64,
flows_total_compressions: u64,
flows_total_compression_diff: u64,
flows_current_compression_diff: u64,
global_alloc_bytes: u64,
global_alloc_count: u64,
global_free_bytes: u64,
global_free_count: u64,
total_events_serialized: u64,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Expiration {
FlowIdleTime(u64),
enum FlowExpiration {
IdleTime(u64),
}
struct FlowExpiry;
@@ -122,6 +216,9 @@ struct FlowExpiry;
#[derive(Clone, Eq, Default, Debug)]
struct FlowKey {
id: u64,
alias: String,
source: String,
thread_id: u64,
}
#[derive(Clone, Debug)]
@@ -134,6 +231,16 @@ struct FlowValue {
first_seen: std::time::SystemTime,
last_seen: std::time::SystemTime,
timeout_in: std::time::SystemTime,
risks: usize,
proto: String,
app_proto: String,
}
#[derive(Clone, Eq, Default, Debug)]
struct DaemonKey {
alias: String,
source: String,
thread_id: u64,
}
impl Default for State {
@@ -152,15 +259,15 @@ impl fmt::Display for State {
}
}
impl Expiration {
impl FlowExpiration {
fn as_duration(&self) -> Option<Duration> {
match self {
Expiration::FlowIdleTime(value) => Some(Duration::from_micros(*value)),
FlowExpiration::IdleTime(value) => Some(Duration::from_micros(*value)),
}
}
}
impl fmt::Display for Expiration {
impl fmt::Display for FlowExpiration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.as_duration() {
Some(duration) => {
@@ -172,11 +279,11 @@ impl fmt::Display for Expiration {
}
}
impl Expiry<FlowKey, (Expiration, FlowValue)> for FlowExpiry {
impl Expiry<FlowKey, (FlowExpiration, FlowValue)> for FlowExpiry {
fn expire_after_create(
&self,
_key: &FlowKey,
value: &(Expiration, FlowValue),
value: &(FlowExpiration, FlowValue),
_current_time: Instant,
) -> Option<Duration> {
value.0.as_duration()
@@ -185,13 +292,35 @@ impl Expiry<FlowKey, (Expiration, FlowValue)> for FlowExpiry {
impl Hash for FlowKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state)
self.id.hash(state);
self.alias.hash(state);
self.source.hash(state);
self.thread_id.hash(state);
}
}
impl PartialEq for FlowKey {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
self.id == other.id &&
self.alias == other.alias &&
self.source == other.source &&
self.thread_id == other.thread_id
}
}
impl Hash for DaemonKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.alias.hash(state);
self.source.hash(state);
self.thread_id.hash(state);
}
}
impl PartialEq for DaemonKey {
fn eq(&self, other: &Self) -> bool {
self.alias == other.alias &&
self.source == other.source &&
self.thread_id == other.thread_id
}
}
@@ -227,10 +356,13 @@ async fn main() {
let data = Arc::new(Mutex::new(Stats::default()));
let data_tx = Arc::clone(&data);
let data_rx = Arc::clone(&data);
let flow_cache: Arc<Cache<FlowKey, (Expiration, FlowValue)>> = Arc::new(Cache::builder()
let flow_cache: Arc<Cache<FlowKey, (FlowExpiration, FlowValue)>> = Arc::new(Cache::builder()
.expire_after(FlowExpiry)
.build());
let flow_cache_rx = Arc::clone(&flow_cache);
let daemon_cache: Arc<Cache<DaemonKey, DaemonEventStatus>> = Arc::new(Cache::builder()
.time_to_live(Duration::from_secs(1800))
.build());
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
@@ -238,7 +370,7 @@ async fn main() {
Ok(message) => {
let mut data_lock = data_tx.lock().await;
data_lock.events += 1;
update_stats(&message, &mut data_lock, &flow_cache).await;
update_stats(&message, &mut data_lock, &flow_cache, &daemon_cache).await;
}
Err(_message) => {
let mut data_lock = data_tx.lock().await;
@@ -271,7 +403,7 @@ async fn main() {
let mut table_state = TableState::default();
loop {
let flows: Vec<(FlowKey, (Expiration, FlowValue))> = flow_cache_rx.iter().map(|(k, v)| (k.as_ref().clone(), v.clone())).collect();
let flows: Vec<(FlowKey, (FlowExpiration, FlowValue))> = flow_cache_rx.iter().map(|(k, v)| (k.as_ref().clone(), v.clone())).collect();
let mut table_selected = match table_state.selected() {
Some(table_index) => {
if flows.len() > 0 && table_index >= flows.len() {
@@ -303,15 +435,15 @@ async fn main() {
table_selected = match table_selected {
_ if flows.len() == 0 => 0,
i if i == 0 => flows.len() - 1,
i if i < 10 => 0,
i => i - 10,
i if i < 25 => 0,
i => i - 25,
};
},
Some(KeyCode::PageDown) => {
table_selected = match table_selected {
i if flows.len() == 0 || i >= flows.len() - 1 => 0,
i if flows.len() < 10 || i >= flows.len() - 10 => flows.len() - 1,
i => i + 10,
i if flows.len() < 25 || i >= flows.len() - 25 => flows.len() - 1,
i => i + 25,
};
},
Some(KeyCode::Home) => {
@@ -383,20 +515,28 @@ fn parse_json(data: &str) -> Result<EventType, ParseError> {
} else if value.get("packet_event_name").is_some() {
let packet_event: PacketEvent = serde_json::from_value(value)?;
return Ok(EventType::Packet(packet_event));
} else if value.get("daemon_event_name").is_some() ||
value.get("error_event_name").is_some() {
} else if value.get("daemon_event_name").is_some() {
if value.get("daemon_event_name").unwrap() == "status" ||
value.get("daemon_event_name").unwrap() == "shutdown"
{
let daemon_status_event: DaemonEventStatus = serde_json::from_value(value)?;
return Ok(EventType::DaemonStatus(daemon_status_event));
}
return Ok(EventType::Other());
} else if value.get("error_event_name").is_some() {
return Ok(EventType::Other());
}
Err(ParseError::Schema())
}
async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cache: &Cache<FlowKey, (Expiration, FlowValue)>) {
async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cache: &Cache<FlowKey, (FlowExpiration, FlowValue)>, daemon_cache: &Cache<DaemonKey, DaemonEventStatus>) {
match &event {
EventType::Flow(flow_event) => {
stats.flow_events += 1;
stats.flow_count = cache.entry_count();
let key = FlowKey { id: flow_event.id };
let key = FlowKey { id: flow_event.id, alias: flow_event.alias.to_string(),
source: flow_event.source.to_string(), thread_id: flow_event.thread_id };
if flow_event.name == EventName::End ||
flow_event.name == EventName::Idle
@@ -422,6 +562,19 @@ async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cach
let timeout_epoch = std::time::Duration::new(timeout_seconds, timeout_nanos as u32);
let timeout_system = UNIX_EPOCH + timeout_epoch;
let risks = match &flow_event.ndpi {
None => 0,
Some(ndpi) => match &ndpi.risks {
None => 0,
Some(risks) => risks.len(),
},
};
let app_proto = match &flow_event.ndpi {
None => "-",
Some(ndpi) => &ndpi.proto,
};
let value = FlowValue {
state: flow_event.state,
total_src_packets: flow_event.src_packets_processed,
@@ -431,14 +584,58 @@ async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cach
first_seen: first_seen_system,
last_seen: last_seen_system,
timeout_in: timeout_system,
risks: risks,
proto: flow_event.l3_proto.to_string() + "/" + &flow_event.l4_proto,
app_proto: app_proto.to_string(),
};
cache.insert(key, (Expiration::FlowIdleTime(flow_event.idle_time), value)).await;
cache.insert(key, (FlowExpiration::IdleTime(flow_event.idle_time), value)).await;
}
EventType::Packet(packet_event) => {
stats.packet_events += 1;
stats.total_caplen += packet_event.pkt_caplen;
stats.total_len += packet_event.pkt_len;
stats.total_l4_len += packet_event.pkt_l4_len;
stats.packet_events_total_caplen += packet_event.pkt_caplen;
stats.packet_events_total_len += packet_event.pkt_len;
stats.packet_events_total_l4_len += packet_event.pkt_l4_len;
}
EventType::DaemonStatus(daemon_status_event) => {
let key = DaemonKey { alias: daemon_status_event.alias.to_string(),
source: daemon_status_event.source.to_string(),
thread_id: daemon_status_event.thread_id };
stats.daemon_events += 1;
daemon_cache.insert(key, daemon_status_event.clone()).await;
stats.packets_captured = 0;
stats.packets_processed = 0;
stats.flows_total_skipped = 0;
stats.flows_total_l4_payload_len = 0;
stats.flows_total_not_detected = 0;
stats.flows_total_guessed = 0;
stats.flows_current_active = 0;
stats.flows_total_compressions = 0;
stats.flows_total_compression_diff = 0;
stats.flows_current_compression_diff = 0;
stats.global_alloc_bytes = 0;
stats.global_alloc_count = 0;
stats.global_free_bytes = 0;
stats.global_free_count = 0;
stats.total_events_serialized = 0;
let daemons: Vec<DaemonEventStatus> = daemon_cache.iter().map(|(_, v)| (v.clone())).collect();
for daemon in daemons {
stats.packets_captured += daemon.packets_captured;
stats.packets_processed += daemon.packets_processed;
stats.flows_total_skipped += daemon.total_skipped_flows;
stats.flows_total_l4_payload_len += daemon.total_l4_payload_len;
stats.flows_total_not_detected += daemon.total_not_detected_flows;
stats.flows_total_guessed += daemon.total_guessed_flows;
stats.flows_current_active += daemon.current_active_flows;
stats.flows_total_compressions += daemon.total_compressions;
stats.flows_total_compression_diff += daemon.total_compression_diff;
stats.flows_current_compression_diff += daemon.current_compression_diff;
stats.global_alloc_bytes += daemon.global_alloc_bytes;
stats.global_alloc_count += daemon.global_alloc_count;
stats.global_free_bytes += daemon.global_free_bytes;
stats.global_free_count += daemon.global_free_count;
stats.total_events_serialized += daemon.total_events_serialized;
}
}
EventType::Other() => {}
}
@@ -460,19 +657,32 @@ fn format_bytes(bytes: u64) -> String {
}
}
fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &mut TableState, table_selected: usize, data: &MutexGuard<Stats>, flows: &Vec<(FlowKey, (Expiration, FlowValue))>) {
fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &mut TableState, table_selected: usize, data: &MutexGuard<Stats>, flows: &Vec<(FlowKey, (FlowExpiration, FlowValue))>) {
let general_items = vec![
ListItem::new("TUI Updates..: ".to_owned() + &data.ui_updates.to_string()),
ListItem::new("Flows Cached.: ".to_owned() + &data.flow_count.to_string()),
ListItem::new("Total Events.: ".to_owned() + &data.events.to_string()),
ListItem::new("Parse Errors.: ".to_owned() + &data.parse_errors.to_string()),
ListItem::new("Flow Events..: ".to_owned() + &data.flow_events.to_string()),
ListItem::new("Packet Events: ".to_owned() + &data.packet_events.to_string()),
];
let packet_items = vec![
ListItem::new("Total Capture Length: ".to_owned() + &format_bytes(data.total_caplen)),
ListItem::new("Total Length........: ".to_owned() + &format_bytes(data.total_len)),
ListItem::new("Total L4 Length.....: ".to_owned() + &format_bytes(data.total_l4_len)),
ListItem::new("Total Events........: ".to_owned() + &data.packet_events.to_string()),
ListItem::new("Total Capture Length: ".to_owned() + &format_bytes(data.packet_events_total_caplen)),
ListItem::new("Total Length........: ".to_owned() + &format_bytes(data.packet_events_total_len)),
ListItem::new("Total L4 Length.....: ".to_owned() + &format_bytes(data.packet_events_total_l4_len)),
];
let daemon_items = vec![
ListItem::new("Total Events.............: ".to_owned() + &data.daemon_events.to_string()),
ListItem::new("Total Packets Captured...: ".to_owned() + &data.packets_captured.to_string()),
ListItem::new("Total Packets Processed..: ".to_owned() + &data.packets_processed.to_string()),
ListItem::new("Total Flows Skipped......: ".to_owned() + &data.flows_total_skipped.to_string()),
ListItem::new("Total Flows Not-Detected.: ".to_owned() + &data.flows_total_not_detected.to_string()),
ListItem::new("Total Compressions/Memory: ".to_owned() + &data.flows_total_compressions.to_string()
+ " / " + &format_bytes(data.flows_total_compression_diff) + " deflate"),
ListItem::new("Total Memory in Use......: ".to_owned() + &format_bytes(data.global_alloc_bytes - data.global_free_bytes)
+ " (" + &format_bytes(data.flows_current_compression_diff) + " deflate)"),
ListItem::new("Total Events Serialized..: ".to_owned() + &data.total_events_serialized.to_string()),
ListItem::new("Current Flows Active.....: ".to_owned() + &data.flows_current_active.to_string()),
];
let table_rows: Vec<Row> = flows
.into_iter()
@@ -527,6 +737,9 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &m
timeout_display,
(val.total_src_packets + val.total_dst_packets).to_string(),
format_bytes(val.total_src_bytes + val.total_dst_bytes),
val.risks.to_string(),
val.proto.to_string(),
val.app_proto.to_string(),
])
})
.collect();
@@ -538,8 +751,8 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &m
.direction(Direction::Vertical)
.constraints(
[
Constraint::Percentage(18),
Constraint::Percentage(82),
Constraint::Length(11),
Constraint::Percentage(100),
].as_ref()
)
.split(size);
@@ -548,8 +761,9 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &m
.direction(Direction::Horizontal)
.constraints(
[
Constraint::Percentage(50),
Constraint::Percentage(50),
Constraint::Percentage(25),
Constraint::Percentage(30),
Constraint::Percentage(55),
].as_ref()
)
.split(chunks[0]);
@@ -559,7 +773,7 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &m
i => i + 1,
};
let table = Table::new(table_rows)
.header(Row::new(vec!["Flow ID", "State", "First Seen", "Last Seen", "Timeout", "Total Packets", "Total Bytes"])
.header(Row::new(vec!["Flow ID", "State", "First Seen", "Last Seen", "Timeout", "Total Packets", "Total Bytes", "Risks", "L3/L4", "L7"])
.style(Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD)))
.block(Block::default().title("Flow Table (selected: ".to_string() +
&table_selected_abs.to_string() +
@@ -568,23 +782,29 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &m
" item(s)").borders(Borders::ALL))
.highlight_style(Style::default().bg(Color::Blue))
.widths(&[
Constraint::Length(20),
Constraint::Length(20),
Constraint::Length(20),
Constraint::Length(20),
Constraint::Length(20),
Constraint::Length(20),
Constraint::Length(20),
Constraint::Length(10),
Constraint::Length(10),
Constraint::Length(12),
Constraint::Length(12),
Constraint::Length(10),
Constraint::Length(13),
Constraint::Length(12),
Constraint::Length(6),
Constraint::Length(12),
Constraint::Length(15),
]);
let general_list = List::new(general_items)
.block(Block::default().title("General").borders(Borders::ALL));
let packet_list = List::new(packet_items)
.block(Block::default().title("Packet Events").borders(Borders::ALL));
let daemon_list = List::new(daemon_items)
.block(Block::default().title("Daemon Events").borders(Borders::ALL));
table_state.select(Some(table_selected));
f.render_widget(general_list, top_chunks[0]);
f.render_widget(packet_list, top_chunks[1]);
f.render_widget(daemon_list, top_chunks[2]);
f.render_stateful_widget(table, chunks[1], table_state);
}).unwrap();
}