mirror of
https://github.com/outbackdingo/nDPId.git
synced 2026-01-27 10:19:45 +00:00
rs-simple: make primitive flow table work
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
@@ -9,10 +9,11 @@ use moka::{future::Cache, Expiry};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::{
|
||||
fmt,
|
||||
hash::{Hash, Hasher},
|
||||
io::self,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
time::{Duration, Instant, UNIX_EPOCH},
|
||||
};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -22,9 +23,9 @@ use tokio::net::TcpStream;
|
||||
use tui::{
|
||||
backend::CrosstermBackend,
|
||||
layout::{Layout, Constraint, Direction},
|
||||
style::{Style, Color},
|
||||
style::{Style, Color, Modifier},
|
||||
Terminal,
|
||||
widgets::{Block, Borders, List, ListItem},
|
||||
widgets::{Block, Borders, List, ListItem, Row, Table, TableState},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -51,10 +52,10 @@ enum EventName {
|
||||
NotDetected,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[derive(Serialize, Deserialize, Copy, Clone, Debug)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
enum State {
|
||||
Info, Finished,
|
||||
Unknown, Info, Finished,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -65,6 +66,12 @@ struct FlowEvent {
|
||||
id: u64,
|
||||
#[serde(rename = "flow_state")]
|
||||
state: State,
|
||||
#[serde(rename = "flow_first_seen")]
|
||||
first_seen: u64,
|
||||
#[serde(rename = "flow_src_last_pkt_time")]
|
||||
src_last_pkt_time: u64,
|
||||
#[serde(rename = "flow_dst_last_pkt_time")]
|
||||
dst_last_pkt_time: u64,
|
||||
#[serde(rename = "flow_idle_time")]
|
||||
idle_time: u64,
|
||||
#[serde(rename = "flow_src_packets_processed")]
|
||||
@@ -112,13 +119,35 @@ enum Expiration {
|
||||
|
||||
struct FlowExpiry;
|
||||
|
||||
#[derive(Eq, Default, Debug)]
|
||||
#[derive(Clone, Eq, Default, Debug)]
|
||||
struct FlowKey {
|
||||
id: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
struct FlowValue {
|
||||
state: State,
|
||||
total_src_packets: u64,
|
||||
total_dst_packets: u64,
|
||||
total_src_bytes: u64,
|
||||
total_dst_bytes: u64,
|
||||
first_seen: (u64, u64, u64, u64),
|
||||
}
|
||||
|
||||
impl Default for State {
|
||||
fn default() -> State {
|
||||
State::Unknown
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for State {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
State::Unknown => write!(f, "N/A"),
|
||||
State::Info => write!(f, "Info"),
|
||||
State::Finished => write!(f, "Finished"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Expiration {
|
||||
@@ -129,6 +158,18 @@ impl Expiration {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Expiration {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self.as_duration() {
|
||||
Some(duration) => {
|
||||
let secs = duration.as_secs();
|
||||
write!(f, "{} s", secs)
|
||||
}
|
||||
None => write!(f, "N/A"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Expiry<FlowKey, (Expiration, FlowValue)> for FlowExpiry {
|
||||
fn expire_after_create(
|
||||
&self,
|
||||
@@ -184,13 +225,12 @@ async fn main() {
|
||||
let data = Arc::new(Mutex::new(Stats::default()));
|
||||
let data_tx = Arc::clone(&data);
|
||||
let data_rx = Arc::clone(&data);
|
||||
let flow_cache: Arc<Cache<FlowKey, (Expiration, FlowValue)>> = Arc::new(Cache::builder()
|
||||
.expire_after(FlowExpiry)
|
||||
.build());
|
||||
let flow_cache_rx = Arc::clone(&flow_cache);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let expiry = FlowExpiry;
|
||||
let flow_cache: Cache<FlowKey, (Expiration, FlowValue)> = Cache::builder()
|
||||
.expire_after(expiry)
|
||||
.build();
|
||||
|
||||
while let Some(msg) = rx.recv().await {
|
||||
match parse_json(&msg) {
|
||||
Ok(message) => {
|
||||
@@ -226,16 +266,45 @@ async fn main() {
|
||||
}
|
||||
});
|
||||
|
||||
let mut table_state = TableState::default();
|
||||
|
||||
loop {
|
||||
let flows: Vec<(FlowKey, (Expiration, FlowValue))> = flow_cache_rx.iter().map(|(k, v)| (k.as_ref().clone(), v.clone())).collect();
|
||||
let mut table_selected = match table_state.selected() {
|
||||
Some(table_index) => {
|
||||
if flows.len() > 0 && table_index >= flows.len() {
|
||||
flows.len() - 1
|
||||
} else {
|
||||
table_index
|
||||
}
|
||||
}
|
||||
None => 0,
|
||||
};
|
||||
|
||||
match read_keypress() {
|
||||
Some(KeyCode::Esc) => break,
|
||||
Some(KeyCode::Char('q')) => break,
|
||||
Some(KeyCode::Up) => {
|
||||
table_selected = match table_selected {
|
||||
i if i == 0 && flows.len() == 0 => 0,
|
||||
i if i == 0 => flows.len() - 1,
|
||||
i => i - 1,
|
||||
};
|
||||
},
|
||||
Some(KeyCode::Down) => {
|
||||
table_selected = match table_selected {
|
||||
i if flows.len() == 0 || i >= flows.len() - 1 => 0,
|
||||
i => i + 1,
|
||||
};
|
||||
},
|
||||
Some(KeyCode::Enter) => break,
|
||||
Some(_) => (),
|
||||
None => ()
|
||||
};
|
||||
|
||||
let mut data_lock = data_rx.lock().await;
|
||||
data_lock.ui_updates += 1;
|
||||
draw_ui(terminal.as_mut().unwrap(), &data_lock);
|
||||
draw_ui(terminal.as_mut().unwrap(), &mut table_state, table_selected, &data_lock, &flows);
|
||||
}
|
||||
|
||||
if let Err(e) = terminal.unwrap().backend_mut().execute(cursor::Show) {
|
||||
@@ -303,8 +372,30 @@ async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cach
|
||||
stats.flow_events += 1;
|
||||
stats.flow_count = cache.entry_count();
|
||||
|
||||
let first_seen_seconds = flow_event.first_seen / 1_000_000;
|
||||
let first_seen_nanos = (flow_event.first_seen % 1_000_000) * 1_000;
|
||||
let first_seen_epoch = std::time::Duration::new(first_seen_seconds, first_seen_nanos as u32);
|
||||
let first_seen_system = UNIX_EPOCH + first_seen_epoch;
|
||||
let time_tuple = match first_seen_system.elapsed() {
|
||||
Ok(elapsed) => {
|
||||
let seconds = elapsed.as_secs();
|
||||
let minutes = seconds / 60;
|
||||
let hours = minutes / 60;
|
||||
let days = hours / 24;
|
||||
(seconds, minutes, hours, days)
|
||||
}
|
||||
Err(_) => (0, 0, 0, 0)
|
||||
};
|
||||
|
||||
let key = FlowKey { id: flow_event.id };
|
||||
let value = FlowValue {};
|
||||
let value = FlowValue {
|
||||
state: flow_event.state,
|
||||
total_src_packets: flow_event.src_packets_processed,
|
||||
total_dst_packets: flow_event.dst_packets_processed,
|
||||
total_src_bytes: flow_event.src_tot_l4_payload_len,
|
||||
total_dst_bytes: flow_event.dst_tot_l4_payload_len,
|
||||
first_seen: time_tuple,
|
||||
};
|
||||
cache.insert(key, (Expiration::FlowIdleTime(flow_event.idle_time), value)).await;
|
||||
}
|
||||
EventType::Packet(packet_event) => {
|
||||
@@ -317,10 +408,10 @@ async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cach
|
||||
}
|
||||
}
|
||||
|
||||
fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, data: &MutexGuard<Stats>) {
|
||||
fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &mut TableState, table_selected: usize, data: &MutexGuard<Stats>, flows: &Vec<(FlowKey, (Expiration, FlowValue))>) {
|
||||
let general_items = vec![
|
||||
ListItem::new("TUI Updates..: ".to_owned() + &data.ui_updates.to_string()),
|
||||
ListItem::new("Flow Count...: ".to_owned() + &data.flow_count.to_string()),
|
||||
ListItem::new("Flows Cached.: ".to_owned() + &data.flow_count.to_string()),
|
||||
ListItem::new("Total Events.: ".to_owned() + &data.events.to_string()),
|
||||
ListItem::new("Parse Errors.: ".to_owned() + &data.parse_errors.to_string()),
|
||||
ListItem::new("Flow Events..: ".to_owned() + &data.flow_events.to_string()),
|
||||
@@ -331,6 +422,29 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, data: &MutexGua
|
||||
ListItem::new("Total Length........: ".to_owned() + &data.total_len.to_string()),
|
||||
ListItem::new("Total L4 Length.....: ".to_owned() + &data.total_l4_len.to_string()),
|
||||
];
|
||||
let table_rows: Vec<Row> = flows
|
||||
.into_iter()
|
||||
.map(|(key, (exp, val))| {
|
||||
let first_seen_display = match (val.first_seen.0, val.first_seen.1,
|
||||
val.first_seen.2, val.first_seen.3)
|
||||
{
|
||||
(_, _, _, d) if d > 0 => format!("{} day(s) ago", d),
|
||||
(_, _, h, _) if h > 0 => format!("{} hour(s) ago", h),
|
||||
(_, m, _, _) if m > 0 => format!("{} min(s) ago", m),
|
||||
(s, _, _, _) if s > 0 => format!("{} sec(s) ago", s),
|
||||
_ => format!("{} sec(s) ago", val.first_seen.0),
|
||||
};
|
||||
|
||||
Row::new(vec![
|
||||
key.id.to_string(),
|
||||
val.state.to_string(),
|
||||
first_seen_display,
|
||||
exp.to_string(),
|
||||
(val.total_src_packets + val.total_dst_packets).to_string(),
|
||||
(val.total_src_bytes + val.total_dst_bytes).to_string(),
|
||||
])
|
||||
})
|
||||
.collect();
|
||||
|
||||
terminal.draw(|f| {
|
||||
let size = f.size();
|
||||
@@ -355,18 +469,36 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, data: &MutexGua
|
||||
)
|
||||
.split(chunks[0]);
|
||||
|
||||
let right_block = Block::default()
|
||||
.title("Unused Bottom Panel")
|
||||
.borders(Borders::ALL)
|
||||
.style(Style::default().fg(Color::Black).bg(Color::Green));
|
||||
let table_selected_abs = match table_selected {
|
||||
_ if flows.len() == 0 => 0,
|
||||
i => i + 1,
|
||||
};
|
||||
let table = Table::new(table_rows)
|
||||
.header(Row::new(vec!["Flow ID", "State", "First Seen", "Timeout", "Total Packets", "Total Bytes"])
|
||||
.style(Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD)))
|
||||
.block(Block::default().title("Flow Table (selected: ".to_string() +
|
||||
&table_selected_abs.to_string() +
|
||||
"): " +
|
||||
&flows.len().to_string() +
|
||||
" item(s)").borders(Borders::ALL))
|
||||
.highlight_style(Style::default().bg(Color::Blue))
|
||||
.widths(&[
|
||||
Constraint::Length(20),
|
||||
Constraint::Length(20),
|
||||
Constraint::Length(20),
|
||||
Constraint::Length(20),
|
||||
Constraint::Length(20),
|
||||
Constraint::Length(20),
|
||||
]);
|
||||
|
||||
let general_list = List::new(general_items)
|
||||
.block(Block::default().title("General").borders(Borders::ALL));
|
||||
let packet_list = List::new(packet_items)
|
||||
.block(Block::default().title("Packet").borders(Borders::ALL));
|
||||
.block(Block::default().title("Packet Events").borders(Borders::ALL));
|
||||
|
||||
table_state.select(Some(table_selected));
|
||||
f.render_widget(general_list, top_chunks[0]);
|
||||
f.render_widget(packet_list, top_chunks[1]);
|
||||
f.render_widget(right_block, chunks[1]);
|
||||
f.render_stateful_widget(table, chunks[1], table_state);
|
||||
}).unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user