mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
feat(gateway): add flow-logs MVP (#10576)
Network flow logs are a common feature of VPNs. Due to the nature of a shared exit node, it is of great interest to a network analyst, which TCP connections are getting routed through the tunnel, who is initiating them, for long do they last and how much traffic is sent across them. With this PR, the Firezone Gateway gains the ability of detecting the TCP and UDP flows that are being routed through it. The information we want to attach to these flows is spread out over several layers of the packet handling code. To simplify the implementation and not complicate the APIs unnecessarily, we chose to rely on TLS (thread-local storage) for gathering all the necessary data as a packet gets passed through the various layers. When using a const initializer, the overhead of a TLS variable over an actual local variable is basically zero. The entire routing state of the Gateway is also never sent across any threads, making TLS variables a particularly good choice for this problem. In its MVP form, the detected flows are only emitted on stdout and also that only if `flow_logs=trace` is set using `RUST_LOG`. Early adopters of this feature are encouraged to enable these logs as described and then ingest the Gateway's logs into the SIEM of their choice for further analysis. Related: #8353
This commit is contained in:
26
.github/workflows/_integration_tests.yml
vendored
26
.github/workflows/_integration_tests.yml
vendored
@@ -103,6 +103,8 @@ jobs:
|
||||
test:
|
||||
- script: create-flow-from-icmp-error
|
||||
min_client_version: 1.5.4
|
||||
- script: download-rst
|
||||
min_gateway_version: 1.4.18
|
||||
- script: curl-api-down
|
||||
- script: curl-api-restart
|
||||
- script: curl-ecn
|
||||
@@ -113,31 +115,43 @@ jobs:
|
||||
- name: dns-systemd-resolved
|
||||
script: systemd/dns-systemd-resolved
|
||||
- script: tcp-dns
|
||||
# Setting both client and gateway to random masquerade will force relay-relay candidate pair
|
||||
- script: download-concurrent
|
||||
min_gateway_version: 1.4.18
|
||||
- name: download-double-symmetric-nat
|
||||
script: download
|
||||
# Setting both client and gateway to random masquerade will force relay-relay candidate pair
|
||||
client_masquerade: random
|
||||
gateway_masquerade: random
|
||||
rust_log: debug
|
||||
rust_log: debug,flow_logs=trace
|
||||
single_relay: true # Force single relay
|
||||
min_gateway_version: 1.4.18
|
||||
- script: download-packet-loss
|
||||
rust_log: debug
|
||||
- script: download-roaming-network
|
||||
# Too noisy can cause flaky tests due to the amount of data
|
||||
rust_log: debug
|
||||
min_gateway_version: 1.4.18
|
||||
rust_log: debug,flow_logs=trace # Too noisy can cause flaky tests due to the amount of data
|
||||
steps:
|
||||
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
|
||||
- uses: ./.github/actions/ghcr-docker-login
|
||||
with:
|
||||
github_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- name: Check minimum client version
|
||||
id: version_check
|
||||
id: client_version_check
|
||||
if: ${{ matrix.test.min_client_version }}
|
||||
continue-on-error: true
|
||||
run: |
|
||||
ACTUAL_VERSION=$(docker run ${{ inputs.client_image }}:${{ inputs.client_tag }} firezone-headless-client --version | awk '{print $2}')
|
||||
MIN_VERSION="${{ matrix.test.min_client_version }}"
|
||||
|
||||
[ "$(printf '%s\n' "$MIN_VERSION" "$ACTUAL_VERSION" | sort --version-sort | head -n1)" == "$MIN_VERSION" ]
|
||||
- name: Check minimum gateway version
|
||||
id: gateway_version_check
|
||||
if: ${{ matrix.test.min_gateway_version }}
|
||||
continue-on-error: true
|
||||
run: |
|
||||
ACTUAL_VERSION=$(docker run ${{ inputs.gateway_image }}:${{ inputs.gateway_tag }} firezone-gateway --version | awk '{print $2}')
|
||||
MIN_VERSION="${{ matrix.test.min_gateway_version }}"
|
||||
|
||||
[ "$(printf '%s\n' "$MIN_VERSION" "$ACTUAL_VERSION" | sort --version-sort | head -n1)" == "$MIN_VERSION" ]
|
||||
# We need at least Docker v28.1 which is not yet available on GitHub actions runners
|
||||
- uses: docker/setup-docker-action@b60f85385d03ac8acfca6d9996982511d8620a19 # v4.3.0
|
||||
@@ -189,7 +203,7 @@ jobs:
|
||||
sudo ethtool -K docker0 tx off
|
||||
|
||||
- run: ./scripts/tests/${{ matrix.test.script }}.sh
|
||||
if: ${{ steps.version_check.outcome != 'failure' }} # Run the script if version check succeeds or is skipped
|
||||
if: ${{ steps.client_version_check.outcome != 'failure' && steps.gateway_version_check.outcome != 'failure' }} # Run the script if version checks succeed or are skipped
|
||||
|
||||
- name: Ensure Client emitted no warnings
|
||||
if: "!cancelled()"
|
||||
|
||||
@@ -242,7 +242,7 @@ services:
|
||||
<<: *health-check
|
||||
environment:
|
||||
FIREZONE_TOKEN: ".SFMyNTY.g2gDaANtAAAAJGM4OWJjYzhjLTkzOTItNGRhZS1hNDBkLTg4OGFlZjZkMjhlMG0AAAAkMjI3NDU2MGItZTk3Yi00NWU0LThiMzQtNjc5Yzc2MTdlOThkbQAAADhPMDJMN1VTMkozVklOT01QUjlKNklMODhRSVFQNlVPOEFRVk82VTVJUEwwVkpDMjJKR0gwPT09PW4GAAH8sImUAWIAAVGA.tAm2O9FcyF67VAF3rZdwQpeADrYOIs3S2l2K51G26OM"
|
||||
RUST_LOG: ${RUST_LOG:-wire=trace,debug}
|
||||
RUST_LOG: ${RUST_LOG:-wire=trace,debug,flow_logs=trace}
|
||||
FIREZONE_API_URL: ws://api:8081
|
||||
FIREZONE_ID: 4694E56C-7643-4A15-9DF3-638E5B05F570
|
||||
command:
|
||||
|
||||
@@ -354,6 +354,26 @@ impl IpPacket {
|
||||
))
|
||||
}
|
||||
|
||||
pub fn layer4_payload_len(&self) -> usize {
|
||||
if let Some(tcp) = self.as_tcp() {
|
||||
return tcp.payload().len();
|
||||
}
|
||||
|
||||
if let Some(udp) = self.as_udp() {
|
||||
return udp.payload().len();
|
||||
}
|
||||
|
||||
if let Some(icmpv4) = self.as_icmpv4() {
|
||||
return icmpv4.payload().len();
|
||||
}
|
||||
|
||||
if let Some(icmpv6) = self.as_icmpv6() {
|
||||
return icmpv6.payload().len();
|
||||
}
|
||||
|
||||
self.payload().len()
|
||||
}
|
||||
|
||||
pub fn set_source_protocol(&mut self, v: u16) {
|
||||
if let Some(mut p) = self.as_tcp_mut() {
|
||||
p.set_source_port(v);
|
||||
|
||||
@@ -17,7 +17,7 @@ bufferpool = { workspace = true }
|
||||
bytes = { workspace = true, features = ["std"] }
|
||||
chrono = { workspace = true }
|
||||
connlib-model = { workspace = true }
|
||||
derive_more = { workspace = true, features = ["debug"] }
|
||||
derive_more = { workspace = true, features = ["debug", "from"] }
|
||||
divan = { workspace = true, optional = true }
|
||||
dns-over-tcp = { workspace = true }
|
||||
dns-types = { workspace = true }
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
mod client_on_gateway;
|
||||
mod filter_engine;
|
||||
mod flow_tracker;
|
||||
mod nat_table;
|
||||
|
||||
pub(crate) use crate::gateway::client_on_gateway::ClientOnGateway;
|
||||
|
||||
use crate::gateway::client_on_gateway::TranslateOutboundResult;
|
||||
use crate::gateway::flow_tracker::FlowTracker;
|
||||
use crate::messages::gateway::ResourceDescription;
|
||||
use crate::messages::{Answer, IceCredentials, ResolveRequest, SecretKey};
|
||||
use crate::peer_store::PeerStore;
|
||||
@@ -38,6 +40,8 @@ pub struct GatewayState {
|
||||
/// All clients we are connected to and the associated, connection-specific state.
|
||||
peers: PeerStore<ClientId, ClientOnGateway>,
|
||||
|
||||
flow_tracker: FlowTracker,
|
||||
|
||||
/// When to next check whether a resource-access policy has expired.
|
||||
next_expiry_resources_check: Option<Instant>,
|
||||
|
||||
@@ -72,6 +76,7 @@ impl GatewayState {
|
||||
next_expiry_resources_check: Default::default(),
|
||||
buffered_events: VecDeque::default(),
|
||||
buffered_transmits: VecDeque::default(),
|
||||
flow_tracker: FlowTracker::new(now),
|
||||
tun_ip_config: None,
|
||||
}
|
||||
}
|
||||
@@ -98,6 +103,8 @@ impl GatewayState {
|
||||
packet: IpPacket,
|
||||
now: Instant,
|
||||
) -> Result<Option<snownet::Transmit>> {
|
||||
let _guard = self.flow_tracker.new_inbound_tun(&packet, now);
|
||||
|
||||
if packet.is_fz_p2p_control() {
|
||||
tracing::warn!("Packet matches heuristics of FZ p2p control protocol");
|
||||
}
|
||||
@@ -114,6 +121,8 @@ impl GatewayState {
|
||||
};
|
||||
let cid = peer.id();
|
||||
|
||||
flow_tracker::inbound_tun::record_client(cid);
|
||||
|
||||
let Some(packet) = peer
|
||||
.translate_inbound(packet, now)
|
||||
.context("Failed to translate inbound packet")?
|
||||
@@ -129,6 +138,11 @@ impl GatewayState {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
flow_tracker::inbound_tun::record_wireguard_packet(
|
||||
encrypted_packet.src,
|
||||
encrypted_packet.dst,
|
||||
);
|
||||
|
||||
Ok(Some(encrypted_packet))
|
||||
}
|
||||
|
||||
@@ -145,6 +159,8 @@ impl GatewayState {
|
||||
packet: &[u8],
|
||||
now: Instant,
|
||||
) -> Result<Option<IpPacket>> {
|
||||
let _guard = self.flow_tracker.new_inbound_wireguard(local, from, now);
|
||||
|
||||
let Some((cid, packet)) = self
|
||||
.node
|
||||
.decapsulate(local, from, packet, now)
|
||||
@@ -153,6 +169,9 @@ impl GatewayState {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
flow_tracker::inbound_wg::record_client(cid);
|
||||
flow_tracker::inbound_wg::record_decrypted_packet(&packet);
|
||||
|
||||
let peer = self
|
||||
.peers
|
||||
.get_mut(&cid)
|
||||
@@ -194,9 +213,15 @@ impl GatewayState {
|
||||
.translate_outbound(packet, now)
|
||||
.context("Failed to translate outbound packet")?
|
||||
{
|
||||
TranslateOutboundResult::Send(ip_packet) => Ok(Some(ip_packet)),
|
||||
TranslateOutboundResult::Send(packet) => {
|
||||
flow_tracker::inbound_wg::record_translated_packet(&packet);
|
||||
|
||||
Ok(Some(packet))
|
||||
}
|
||||
TranslateOutboundResult::DestinationUnreachable(reply)
|
||||
| TranslateOutboundResult::Filtered(reply) => {
|
||||
flow_tracker::inbound_wg::record_icmp_error(&reply);
|
||||
|
||||
let Some(transmit) = encrypt_packet(reply, cid, &mut self.node, now)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -412,6 +437,7 @@ impl GatewayState {
|
||||
pub fn handle_timeout(&mut self, now: Instant, utc_now: DateTime<Utc>) {
|
||||
self.node.handle_timeout(now);
|
||||
self.drain_node_events();
|
||||
self.flow_tracker.handle_timeout(now);
|
||||
|
||||
match self.next_expiry_resources_check {
|
||||
Some(next_expiry_resources_check) if now >= next_expiry_resources_check => {
|
||||
@@ -432,6 +458,65 @@ impl GatewayState {
|
||||
None => self.next_expiry_resources_check = Some(now + EXPIRE_RESOURCES_INTERVAL),
|
||||
Some(_) => {}
|
||||
}
|
||||
|
||||
while let Some(flow) = self.flow_tracker.poll_completed_flow() {
|
||||
match flow {
|
||||
flow_tracker::CompletedFlow::Tcp(flow) => {
|
||||
tracing::trace!(
|
||||
target: "flow_logs::tcp",
|
||||
|
||||
client = %flow.client,
|
||||
resource = %flow.resource,
|
||||
start = ?flow.start,
|
||||
end = ?flow.end,
|
||||
last_packet = ?flow.last_packet,
|
||||
|
||||
inner_src_ip = %flow.inner_src_ip,
|
||||
inner_dst_ip = %flow.inner_dst_ip,
|
||||
inner_src_port = %flow.inner_src_port,
|
||||
inner_dst_port = %flow.inner_dst_port,
|
||||
|
||||
outer_src_ip = %flow.outer_src_ip,
|
||||
outer_dst_ip = %flow.outer_dst_ip,
|
||||
outer_src_port = %flow.outer_src_port,
|
||||
outer_dst_port = %flow.outer_dst_port,
|
||||
|
||||
rx_packets = %flow.rx_packets,
|
||||
tx_packets = %flow.tx_packets,
|
||||
rx_bytes = %flow.rx_bytes,
|
||||
tx_bytes = %flow.tx_bytes,
|
||||
"TCP flow completed"
|
||||
);
|
||||
}
|
||||
flow_tracker::CompletedFlow::Udp(flow) => {
|
||||
tracing::trace!(
|
||||
target: "flow_logs::udp",
|
||||
|
||||
client = %flow.client,
|
||||
resource = %flow.resource,
|
||||
start = ?flow.start,
|
||||
end = ?flow.end,
|
||||
last_packet = ?flow.last_packet,
|
||||
|
||||
inner_src_ip = %flow.inner_src_ip,
|
||||
inner_dst_ip = %flow.inner_dst_ip,
|
||||
inner_src_port = %flow.inner_src_port,
|
||||
inner_dst_port = %flow.inner_dst_port,
|
||||
|
||||
outer_src_ip = %flow.outer_src_ip,
|
||||
outer_dst_ip = %flow.outer_dst_ip,
|
||||
outer_src_port = %flow.outer_src_port,
|
||||
outer_dst_port = %flow.outer_dst_port,
|
||||
|
||||
rx_packets = %flow.rx_packets,
|
||||
tx_packets = %flow.tx_packets,
|
||||
rx_bytes = %flow.rx_bytes,
|
||||
tx_bytes = %flow.tx_bytes,
|
||||
"UDP flow completed"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn drain_node_events(&mut self) {
|
||||
|
||||
@@ -13,6 +13,7 @@ use ip_packet::{IpPacket, Protocol, UnsupportedProtocol};
|
||||
|
||||
use crate::client::{IPV4_RESOURCES, IPV6_RESOURCES};
|
||||
use crate::gateway::filter_engine::FilterEngine;
|
||||
use crate::gateway::flow_tracker;
|
||||
use crate::gateway::nat_table::{NatTable, TranslateIncomingResult};
|
||||
use crate::messages::gateway::Filters;
|
||||
use crate::messages::gateway::ResourceDescription;
|
||||
@@ -319,7 +320,7 @@ impl ClientOnGateway {
|
||||
now: Instant,
|
||||
) -> anyhow::Result<TranslateOutboundResult> {
|
||||
// Filtering a packet is not an error.
|
||||
if let Err(e) = self.ensure_allowed_src_and_dst(&packet) {
|
||||
if let Err(e) = self.ensure_allowed_outbound(&packet) {
|
||||
tracing::debug!(filtered_packet = ?packet, "{e:#}");
|
||||
return Ok(TranslateOutboundResult::Filtered(
|
||||
ip_packet::make::icmp_dest_unreachable_prohibited(&packet)?,
|
||||
@@ -355,21 +356,24 @@ impl ClientOnGateway {
|
||||
return Ok(Some(packet));
|
||||
}
|
||||
|
||||
if let Err(e) = self.classify_resource(packet.source(), packet.source_protocol()) {
|
||||
tracing::debug!(
|
||||
"Inbound packet is not allowed, perhaps from an old client session? error = {e:#}"
|
||||
);
|
||||
|
||||
self.num_dropped_packets.add(
|
||||
1,
|
||||
&[
|
||||
otel::attr::network_type_for_packet(&packet),
|
||||
otel::attr::network_io_direction_receive(),
|
||||
otel::attr::error_type(e.root_cause().to_string()),
|
||||
],
|
||||
);
|
||||
|
||||
return Ok(None);
|
||||
match self.classify_resource(packet.source(), packet.source_protocol()) {
|
||||
Ok(rid) => {
|
||||
flow_tracker::inbound_tun::record_resource(rid);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
"Inbound packet is not allowed, perhaps from an old client session? error = {e:#}"
|
||||
);
|
||||
self.num_dropped_packets.add(
|
||||
1,
|
||||
&[
|
||||
otel::attr::network_type_for_packet(&packet),
|
||||
otel::attr::network_io_direction_receive(),
|
||||
otel::attr::error_type(e.root_cause().to_string()),
|
||||
],
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(packet))
|
||||
@@ -476,7 +480,7 @@ impl ClientOnGateway {
|
||||
self.resources.contains_key(&resource)
|
||||
}
|
||||
|
||||
fn ensure_allowed_src_and_dst(&self, packet: &IpPacket) -> anyhow::Result<()> {
|
||||
fn ensure_allowed_outbound(&self, packet: &IpPacket) -> anyhow::Result<()> {
|
||||
self.ensure_client_ip(packet.source())?;
|
||||
|
||||
// Traffic to our own IP is allowed.
|
||||
@@ -484,7 +488,9 @@ impl ClientOnGateway {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.classify_resource(packet.destination(), packet.destination_protocol())?;
|
||||
let rid = self.classify_resource(packet.destination(), packet.destination_protocol())?;
|
||||
|
||||
flow_tracker::inbound_wg::record_resource(rid);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
839
rust/connlib/tunnel/src/gateway/flow_tracker.rs
Normal file
839
rust/connlib/tunnel/src/gateway/flow_tracker.rs
Normal file
@@ -0,0 +1,839 @@
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
collections::{HashMap, VecDeque, hash_map},
|
||||
net::{IpAddr, SocketAddr},
|
||||
sync::LazyLock,
|
||||
};
|
||||
|
||||
use chrono::{DateTime, TimeDelta, Utc};
|
||||
use connlib_model::{ClientId, ResourceId};
|
||||
use ip_packet::{IcmpError, IpPacket, Protocol, UnsupportedProtocol};
|
||||
use std::time::Instant;
|
||||
|
||||
thread_local! {
|
||||
static CURRENT_FLOW: RefCell<Option<FlowData>> = const { RefCell::new(None) };
|
||||
}
|
||||
|
||||
// Workaround because `tracing::enabled!` is broken.
|
||||
static IS_ENABLED: LazyLock<bool> = LazyLock::new(|| {
|
||||
std::env::var("RUST_LOG").is_ok_and(|directives| directives.contains("flow_logs=trace"))
|
||||
});
|
||||
|
||||
const FLOW_TIMEOUT: TimeDelta = TimeDelta::minutes(2);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FlowTracker {
|
||||
active_tcp_flows: HashMap<TcpFlowKey, TcpFlowValue>,
|
||||
active_udp_flows: HashMap<UdpFlowKey, UdpFlowValue>,
|
||||
|
||||
completed_flows: VecDeque<CompletedFlow>,
|
||||
|
||||
created_at: Instant,
|
||||
created_at_utc: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl FlowTracker {
|
||||
pub fn new(now: Instant) -> Self {
|
||||
Self {
|
||||
active_tcp_flows: Default::default(),
|
||||
active_udp_flows: Default::default(),
|
||||
completed_flows: Default::default(),
|
||||
created_at: now,
|
||||
created_at_utc: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_inbound_tun<'a>(
|
||||
&'a mut self,
|
||||
packet: &IpPacket,
|
||||
now: Instant,
|
||||
) -> CurrentFlowGuard<'a> {
|
||||
if !*IS_ENABLED {
|
||||
return CurrentFlowGuard {
|
||||
inner: self,
|
||||
created_at: now,
|
||||
};
|
||||
}
|
||||
|
||||
let current = CURRENT_FLOW.replace(Some(FlowData::InboundTun(InboundTun {
|
||||
inner: InnerFlow::from(packet),
|
||||
outer: None,
|
||||
client: None,
|
||||
resource: None,
|
||||
})));
|
||||
debug_assert!(
|
||||
current.is_none(),
|
||||
"at most 1 flow should be active at any time"
|
||||
);
|
||||
|
||||
CurrentFlowGuard {
|
||||
inner: self,
|
||||
created_at: now,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_inbound_wireguard<'a>(
|
||||
&'a mut self,
|
||||
local: SocketAddr,
|
||||
remote: SocketAddr,
|
||||
now: Instant,
|
||||
) -> CurrentFlowGuard<'a> {
|
||||
if !*IS_ENABLED {
|
||||
return CurrentFlowGuard {
|
||||
inner: self,
|
||||
created_at: now,
|
||||
};
|
||||
}
|
||||
|
||||
let current = CURRENT_FLOW.replace(Some(FlowData::InboundWireGuard(InboundWireGuard {
|
||||
outer: OuterFlow { local, remote },
|
||||
inner: None,
|
||||
client: None,
|
||||
resource: None,
|
||||
icmp_error: None,
|
||||
})));
|
||||
debug_assert!(
|
||||
current.is_none(),
|
||||
"at most 1 flow should be active at any time"
|
||||
);
|
||||
|
||||
CurrentFlowGuard {
|
||||
inner: self,
|
||||
created_at: now,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll_completed_flow(&mut self) -> Option<CompletedFlow> {
|
||||
self.completed_flows.pop_front()
|
||||
}
|
||||
|
||||
pub fn handle_timeout(&mut self, now: Instant) {
|
||||
let now_utc = self.now_utc(now);
|
||||
|
||||
for (key, value) in self
|
||||
.active_tcp_flows
|
||||
.extract_if(|_, value| now_utc.signed_duration_since(value.last_packet) > FLOW_TIMEOUT)
|
||||
{
|
||||
let flow = CompletedTcpFlow::new(key, value, now_utc);
|
||||
|
||||
tracing::debug!(?flow, "Terminating TCP flow; timeout");
|
||||
|
||||
self.completed_flows.push_back(flow.into());
|
||||
}
|
||||
|
||||
for (key, value) in self
|
||||
.active_udp_flows
|
||||
.extract_if(|_, value| now_utc.signed_duration_since(value.last_packet) > FLOW_TIMEOUT)
|
||||
{
|
||||
let flow = CompletedUdpFlow::new(key, value, now_utc);
|
||||
|
||||
tracing::debug!(?flow, "Terminating UDP flow; timeout");
|
||||
|
||||
self.completed_flows.push_back(flow.into());
|
||||
}
|
||||
|
||||
for (key, value) in self
|
||||
.active_tcp_flows
|
||||
.extract_if(|_, value| value.fin_rx && value.fin_tx)
|
||||
{
|
||||
let end = value.last_packet;
|
||||
let flow = CompletedTcpFlow::new(key, value, end);
|
||||
|
||||
tracing::debug!(?flow, "Terminating TCP flow; FIN sent & received");
|
||||
|
||||
self.completed_flows.push_back(flow.into());
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_inbound_wireguard_flow(&mut self, flow: InboundWireGuard, now: Instant) {
|
||||
let InboundWireGuard {
|
||||
outer,
|
||||
inner:
|
||||
Some(InnerFlow {
|
||||
src_ip,
|
||||
dst_ip,
|
||||
src_proto: Ok(src_proto),
|
||||
dst_proto: Ok(dst_proto),
|
||||
tcp_syn,
|
||||
tcp_fin,
|
||||
tcp_rst,
|
||||
payload_len,
|
||||
}),
|
||||
client: Some(client),
|
||||
resource: Some(resource),
|
||||
icmp_error: _, // TODO: What to do with ICMP errors?
|
||||
} = flow
|
||||
else {
|
||||
tracing::trace!(?flow, "Cannot create flow with missing data");
|
||||
|
||||
return;
|
||||
};
|
||||
let now_utc = self.now_utc(now);
|
||||
let context = FlowContext {
|
||||
src_ip: outer.remote.ip(),
|
||||
dst_ip: outer.local.ip(),
|
||||
src_port: outer.remote.port(),
|
||||
dst_port: outer.local.port(),
|
||||
};
|
||||
|
||||
match (src_proto, dst_proto) {
|
||||
(Protocol::Tcp(src_port), Protocol::Tcp(dst_port)) => {
|
||||
let key = TcpFlowKey {
|
||||
client,
|
||||
resource,
|
||||
src_ip,
|
||||
dst_ip,
|
||||
src_port,
|
||||
dst_port,
|
||||
};
|
||||
|
||||
match self.active_tcp_flows.entry(key) {
|
||||
hash_map::Entry::Vacant(vacant) => {
|
||||
if tcp_fin || tcp_rst {
|
||||
// Don't create new flows for FIN/RST packets.
|
||||
return;
|
||||
}
|
||||
|
||||
tracing::debug!(key = ?vacant.key(), ?context, syn = %tcp_syn, "Creating new TCP flow");
|
||||
|
||||
vacant.insert(TcpFlowValue {
|
||||
start: now_utc,
|
||||
last_packet: now_utc,
|
||||
stats: FlowStats::default().with_tx(payload_len as u64),
|
||||
context,
|
||||
fin_tx: false,
|
||||
fin_rx: false,
|
||||
});
|
||||
}
|
||||
hash_map::Entry::Occupied(occupied) if occupied.get().context != context => {
|
||||
let (key, value) = occupied.remove_entry();
|
||||
let context_diff = FlowContextDiff::new(value.context, context);
|
||||
|
||||
tracing::debug!(
|
||||
?key,
|
||||
?context_diff,
|
||||
"Splitting existing TCP flow; context changed"
|
||||
);
|
||||
|
||||
let flow = CompletedTcpFlow::new(key, value, now_utc);
|
||||
|
||||
self.completed_flows.push_back(flow.into());
|
||||
|
||||
self.active_tcp_flows.insert(
|
||||
key,
|
||||
TcpFlowValue {
|
||||
start: now_utc,
|
||||
last_packet: now_utc,
|
||||
stats: FlowStats::default().with_tx(payload_len as u64),
|
||||
context,
|
||||
fin_tx: false,
|
||||
fin_rx: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
hash_map::Entry::Occupied(occupied) if tcp_syn => {
|
||||
let (key, value) = occupied.remove_entry();
|
||||
|
||||
tracing::debug!(?key, "Splitting existing TCP flow; new TCP SYN");
|
||||
|
||||
let flow = CompletedTcpFlow::new(key, value, now_utc);
|
||||
|
||||
self.completed_flows.push_back(flow.into());
|
||||
|
||||
self.active_tcp_flows.insert(
|
||||
key,
|
||||
TcpFlowValue {
|
||||
start: now_utc,
|
||||
last_packet: now_utc,
|
||||
stats: FlowStats::default().with_tx(payload_len as u64),
|
||||
context,
|
||||
fin_tx: false,
|
||||
fin_rx: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
hash_map::Entry::Occupied(mut occupied) => {
|
||||
let value = occupied.get_mut();
|
||||
|
||||
value.stats.inc_tx(payload_len as u64);
|
||||
value.last_packet = now_utc;
|
||||
if tcp_fin {
|
||||
value.fin_tx = true;
|
||||
}
|
||||
|
||||
if tcp_rst {
|
||||
let (key, value) = occupied.remove_entry();
|
||||
let flow = CompletedTcpFlow::new(key, value, now_utc);
|
||||
|
||||
tracing::debug!(?flow, "TCP flow completed on outbound RST");
|
||||
|
||||
self.completed_flows.push_back(flow.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
(Protocol::Udp(src_port), Protocol::Udp(dst_port)) => {
|
||||
let key = UdpFlowKey {
|
||||
client,
|
||||
resource,
|
||||
src_ip,
|
||||
dst_ip,
|
||||
src_port,
|
||||
dst_port,
|
||||
};
|
||||
|
||||
match self.active_udp_flows.entry(key) {
|
||||
hash_map::Entry::Vacant(vacant) => {
|
||||
tracing::debug!(key = ?vacant.key(), "Creating new UDP flow");
|
||||
|
||||
vacant.insert(UdpFlowValue {
|
||||
start: now_utc,
|
||||
last_packet: now_utc,
|
||||
stats: FlowStats::default().with_tx(payload_len as u64),
|
||||
context,
|
||||
});
|
||||
}
|
||||
hash_map::Entry::Occupied(occupied) if occupied.get().context != context => {
|
||||
let (key, value) = occupied.remove_entry();
|
||||
let context_diff = FlowContextDiff::new(value.context, context);
|
||||
|
||||
let flow = CompletedUdpFlow::new(key, value, now_utc);
|
||||
|
||||
tracing::debug!(
|
||||
?key,
|
||||
?context_diff,
|
||||
"Splitting existing UDP flow; context changed"
|
||||
);
|
||||
|
||||
self.completed_flows.push_back(flow.into());
|
||||
|
||||
self.active_udp_flows.insert(
|
||||
key,
|
||||
UdpFlowValue {
|
||||
start: now_utc,
|
||||
last_packet: now_utc,
|
||||
stats: FlowStats::default().with_tx(payload_len as u64),
|
||||
context,
|
||||
},
|
||||
);
|
||||
}
|
||||
hash_map::Entry::Occupied(mut occupied) => {
|
||||
let value = occupied.get_mut();
|
||||
|
||||
value.stats.inc_tx(payload_len as u64);
|
||||
value.last_packet = now_utc;
|
||||
}
|
||||
};
|
||||
}
|
||||
(Protocol::Icmp(_), Protocol::Icmp(_)) => {}
|
||||
_ => {
|
||||
tracing::error!("src and dst protocol must be the same");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_inbound_tun_flow(&mut self, flow: InboundTun, now: Instant) {
|
||||
let InboundTun {
|
||||
inner:
|
||||
InnerFlow {
|
||||
src_ip,
|
||||
dst_ip,
|
||||
src_proto: Ok(src_proto),
|
||||
dst_proto: Ok(dst_proto),
|
||||
tcp_syn: _,
|
||||
tcp_fin,
|
||||
tcp_rst,
|
||||
payload_len,
|
||||
},
|
||||
outer: Some(_),
|
||||
client: Some(client),
|
||||
resource: Some(resource),
|
||||
} = flow
|
||||
else {
|
||||
tracing::trace!(?flow, "Cannot create flow with missing data");
|
||||
|
||||
return;
|
||||
};
|
||||
let now_utc = self.now_utc(now);
|
||||
|
||||
match (src_proto, dst_proto) {
|
||||
(Protocol::Tcp(src_port), Protocol::Tcp(dst_port)) => {
|
||||
// For packets inbound from the TUN device, we need to flip src & dst.
|
||||
let key = TcpFlowKey {
|
||||
client,
|
||||
resource,
|
||||
src_ip: dst_ip,
|
||||
dst_ip: src_ip,
|
||||
src_port: dst_port,
|
||||
dst_port: src_port,
|
||||
};
|
||||
|
||||
match self.active_tcp_flows.entry(key) {
|
||||
hash_map::Entry::Vacant(vacant) => {
|
||||
if tcp_fin || tcp_rst {
|
||||
// Don't care about FIN/RST packets where the flow no longer exists.
|
||||
return;
|
||||
}
|
||||
|
||||
tracing::debug!(key = ?vacant.key(), "No existing TCP flow for packet inbound on TUN device");
|
||||
}
|
||||
hash_map::Entry::Occupied(mut occupied) => {
|
||||
let value = occupied.get_mut();
|
||||
value.stats.inc_rx(payload_len as u64);
|
||||
value.last_packet = now_utc;
|
||||
|
||||
if tcp_fin {
|
||||
value.fin_rx = true;
|
||||
}
|
||||
|
||||
if tcp_rst {
|
||||
let (key, value) = occupied.remove_entry();
|
||||
let flow = CompletedTcpFlow::new(key, value, now_utc);
|
||||
|
||||
tracing::debug!(?flow, "TCP flow completed on inbound RST");
|
||||
|
||||
self.completed_flows.push_back(flow.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
(Protocol::Udp(src_port), Protocol::Udp(dst_port)) => {
|
||||
// For packets inbound from the TUN device, we need to flip src & dst.
|
||||
let key = UdpFlowKey {
|
||||
client,
|
||||
resource,
|
||||
src_ip: dst_ip,
|
||||
dst_ip: src_ip,
|
||||
src_port: dst_port,
|
||||
dst_port: src_port,
|
||||
};
|
||||
|
||||
match self.active_udp_flows.entry(key) {
|
||||
hash_map::Entry::Vacant(vacant) => {
|
||||
tracing::debug!(key = ?vacant.key(), "No existing UDP flow for packet inbound on TUN device");
|
||||
}
|
||||
hash_map::Entry::Occupied(mut occupied) => {
|
||||
let value = occupied.get_mut();
|
||||
value.stats.inc_rx(payload_len as u64);
|
||||
value.last_packet = now_utc;
|
||||
}
|
||||
};
|
||||
}
|
||||
(Protocol::Icmp(_), Protocol::Icmp(_)) => {}
|
||||
_ => {
|
||||
tracing::error!("src and dst protocol must be the same");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn now_utc(&self, now: Instant) -> DateTime<Utc> {
|
||||
self.created_at_utc + now.duration_since(self.created_at)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, derive_more::From)]
|
||||
pub enum CompletedFlow {
|
||||
Tcp(CompletedTcpFlow),
|
||||
Udp(CompletedUdpFlow),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CompletedTcpFlow {
|
||||
pub client: ClientId,
|
||||
pub resource: ResourceId,
|
||||
pub start: DateTime<Utc>,
|
||||
pub end: DateTime<Utc>,
|
||||
pub last_packet: DateTime<Utc>,
|
||||
|
||||
pub inner_src_ip: IpAddr,
|
||||
pub inner_dst_ip: IpAddr,
|
||||
pub inner_src_port: u16,
|
||||
pub inner_dst_port: u16,
|
||||
|
||||
pub outer_src_ip: IpAddr,
|
||||
pub outer_dst_ip: IpAddr,
|
||||
pub outer_src_port: u16,
|
||||
pub outer_dst_port: u16,
|
||||
|
||||
pub rx_packets: u64,
|
||||
pub tx_packets: u64,
|
||||
pub rx_bytes: u64,
|
||||
pub tx_bytes: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CompletedUdpFlow {
|
||||
pub client: ClientId,
|
||||
pub resource: ResourceId,
|
||||
pub start: DateTime<Utc>,
|
||||
pub end: DateTime<Utc>,
|
||||
pub last_packet: DateTime<Utc>,
|
||||
|
||||
pub inner_src_ip: IpAddr,
|
||||
pub inner_dst_ip: IpAddr,
|
||||
pub inner_src_port: u16,
|
||||
pub inner_dst_port: u16,
|
||||
|
||||
pub outer_src_ip: IpAddr,
|
||||
pub outer_dst_ip: IpAddr,
|
||||
pub outer_src_port: u16,
|
||||
pub outer_dst_port: u16,
|
||||
|
||||
pub rx_packets: u64,
|
||||
pub tx_packets: u64,
|
||||
pub rx_bytes: u64,
|
||||
pub tx_bytes: u64,
|
||||
}
|
||||
|
||||
impl CompletedTcpFlow {
|
||||
fn new(key: TcpFlowKey, value: TcpFlowValue, end: DateTime<Utc>) -> Self {
|
||||
Self {
|
||||
client: key.client,
|
||||
resource: key.resource,
|
||||
start: value.start,
|
||||
end,
|
||||
last_packet: value.last_packet,
|
||||
inner_src_ip: key.src_ip,
|
||||
inner_dst_ip: key.dst_ip,
|
||||
inner_src_port: key.src_port,
|
||||
inner_dst_port: key.dst_port,
|
||||
outer_src_ip: value.context.src_ip,
|
||||
outer_dst_ip: value.context.dst_ip,
|
||||
outer_src_port: value.context.src_port,
|
||||
outer_dst_port: value.context.dst_port,
|
||||
rx_packets: value.stats.rx_packets,
|
||||
tx_packets: value.stats.tx_packets,
|
||||
rx_bytes: value.stats.rx_bytes,
|
||||
tx_bytes: value.stats.tx_bytes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CompletedUdpFlow {
|
||||
fn new(key: UdpFlowKey, value: UdpFlowValue, end: DateTime<Utc>) -> Self {
|
||||
Self {
|
||||
client: key.client,
|
||||
resource: key.resource,
|
||||
start: value.start,
|
||||
end,
|
||||
last_packet: value.last_packet,
|
||||
inner_src_ip: key.src_ip,
|
||||
inner_dst_ip: key.dst_ip,
|
||||
inner_src_port: key.src_port,
|
||||
inner_dst_port: key.dst_port,
|
||||
outer_src_ip: value.context.src_ip,
|
||||
outer_dst_ip: value.context.dst_ip,
|
||||
outer_src_port: value.context.src_port,
|
||||
outer_dst_port: value.context.dst_port,
|
||||
rx_packets: value.stats.rx_packets,
|
||||
tx_packets: value.stats.tx_packets,
|
||||
rx_bytes: value.stats.rx_bytes,
|
||||
tx_bytes: value.stats.tx_bytes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
|
||||
struct TcpFlowKey {
|
||||
client: ClientId,
|
||||
resource: ResourceId,
|
||||
src_ip: IpAddr,
|
||||
dst_ip: IpAddr,
|
||||
src_port: u16,
|
||||
dst_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
|
||||
struct UdpFlowKey {
|
||||
client: ClientId,
|
||||
resource: ResourceId,
|
||||
src_ip: IpAddr,
|
||||
dst_ip: IpAddr,
|
||||
src_port: u16,
|
||||
dst_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TcpFlowValue {
|
||||
start: DateTime<Utc>,
|
||||
last_packet: DateTime<Utc>,
|
||||
stats: FlowStats,
|
||||
context: FlowContext,
|
||||
|
||||
fin_tx: bool,
|
||||
fin_rx: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct UdpFlowValue {
|
||||
start: DateTime<Utc>,
|
||||
last_packet: DateTime<Utc>,
|
||||
stats: FlowStats,
|
||||
context: FlowContext,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct FlowStats {
|
||||
rx_packets: u64,
|
||||
tx_packets: u64,
|
||||
rx_bytes: u64,
|
||||
tx_bytes: u64,
|
||||
}
|
||||
|
||||
impl FlowStats {
|
||||
fn with_tx(mut self, payload_len: u64) -> Self {
|
||||
self.inc_tx(payload_len);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
fn inc_tx(&mut self, payload_len: u64) {
|
||||
self.tx_packets += 1;
|
||||
self.tx_bytes += payload_len;
|
||||
}
|
||||
|
||||
fn inc_rx(&mut self, payload_len: u64) {
|
||||
self.rx_packets += 1;
|
||||
self.rx_bytes += payload_len;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
struct FlowContext {
|
||||
src_ip: IpAddr,
|
||||
dst_ip: IpAddr,
|
||||
src_port: u16,
|
||||
dst_port: u16,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq)]
|
||||
struct FlowContextDiff {
|
||||
src_ip: Option<(IpAddr, IpAddr)>,
|
||||
dst_ip: Option<(IpAddr, IpAddr)>,
|
||||
src_port: Option<(u16, u16)>,
|
||||
dst_port: Option<(u16, u16)>,
|
||||
}
|
||||
|
||||
impl FlowContextDiff {
|
||||
fn new(old: FlowContext, new: FlowContext) -> Self {
|
||||
let src_ip_diff = (old.src_ip != new.src_ip).then_some((old.src_ip, new.src_ip));
|
||||
let dst_ip_diff = (old.dst_ip != new.dst_ip).then_some((old.dst_ip, new.dst_ip));
|
||||
let src_port_diff = (old.src_port != new.src_port).then_some((old.src_port, new.src_port));
|
||||
let dst_port_diff = (old.dst_port != new.dst_port).then_some((old.dst_port, new.dst_port));
|
||||
|
||||
Self {
|
||||
src_ip: src_ip_diff,
|
||||
dst_ip: dst_ip_diff,
|
||||
src_port: src_port_diff,
|
||||
dst_port: dst_port_diff,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FlowContextDiff {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let mut debug_struct = f.debug_struct("FlowContextDiff");
|
||||
|
||||
if let Some((old, new)) = self.src_ip {
|
||||
debug_struct
|
||||
.field("old_src_ip", &old)
|
||||
.field("new_src_ip", &new);
|
||||
}
|
||||
if let Some((old, new)) = self.dst_ip {
|
||||
debug_struct
|
||||
.field("old_dst_ip", &old)
|
||||
.field("new_dst_ip", &new);
|
||||
}
|
||||
if let Some((old, new)) = self.src_port {
|
||||
debug_struct
|
||||
.field("old_src_port", &old)
|
||||
.field("new_src_port", &new);
|
||||
}
|
||||
if let Some((old, new)) = self.dst_port {
|
||||
debug_struct
|
||||
.field("old_dst_port", &old)
|
||||
.field("new_dst_port", &new);
|
||||
}
|
||||
|
||||
debug_struct.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub mod inbound_wg {
|
||||
use super::*;
|
||||
|
||||
pub fn record_client(cid: ClientId) {
|
||||
update_current_flow_inbound_wireguard(|wg| wg.client.replace(cid));
|
||||
}
|
||||
|
||||
pub fn record_resource(rid: ResourceId) {
|
||||
update_current_flow_inbound_wireguard(|wg| wg.resource.replace(rid));
|
||||
}
|
||||
|
||||
pub fn record_decrypted_packet(packet: &IpPacket) {
|
||||
update_current_flow_inbound_wireguard(|wg| {
|
||||
wg.inner = Some(InnerFlow::from(packet));
|
||||
});
|
||||
}
|
||||
|
||||
pub fn record_translated_packet(packet: &IpPacket) {
|
||||
update_current_flow_inbound_wireguard(|wg| {
|
||||
let Some(inner) = wg.inner.as_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
inner.dst_ip = packet.destination();
|
||||
});
|
||||
}
|
||||
|
||||
pub fn record_icmp_error(packet: &IpPacket) {
|
||||
let Ok(Some((_, icmp_error))) = packet.icmp_error() else {
|
||||
return;
|
||||
};
|
||||
|
||||
update_current_flow_inbound_wireguard(|wg| wg.icmp_error.replace(icmp_error));
|
||||
}
|
||||
}
|
||||
|
||||
pub mod inbound_tun {
|
||||
use super::*;
|
||||
|
||||
pub fn record_client(cid: ClientId) {
|
||||
update_current_flow_inbound_tun(|tun| tun.client.replace(cid));
|
||||
}
|
||||
|
||||
pub fn record_resource(rid: ResourceId) {
|
||||
update_current_flow_inbound_tun(|tun| tun.resource.replace(rid));
|
||||
}
|
||||
|
||||
pub fn record_wireguard_packet(local: Option<SocketAddr>, remote: SocketAddr) {
|
||||
update_current_flow_inbound_tun(|tun| tun.outer = Some(OuterFlow { local, remote }));
|
||||
}
|
||||
}
|
||||
|
||||
fn update_current_flow_inbound_wireguard<R>(f: impl FnOnce(&mut InboundWireGuard) -> R) {
|
||||
CURRENT_FLOW.with_borrow_mut(|c| {
|
||||
let Some(FlowData::InboundWireGuard(wg)) = c else {
|
||||
return;
|
||||
};
|
||||
|
||||
f(wg);
|
||||
});
|
||||
}
|
||||
|
||||
fn update_current_flow_inbound_tun<R>(f: impl FnOnce(&mut InboundTun) -> R) {
|
||||
CURRENT_FLOW.with_borrow_mut(|c| {
|
||||
let Some(FlowData::InboundTun(tun)) = c else {
|
||||
return;
|
||||
};
|
||||
|
||||
f(tun);
|
||||
});
|
||||
}
|
||||
|
||||
pub struct CurrentFlowGuard<'a> {
|
||||
inner: &'a mut FlowTracker,
|
||||
created_at: Instant,
|
||||
}
|
||||
|
||||
impl<'a> Drop for CurrentFlowGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
let Some(current_flow) = CURRENT_FLOW.replace(None) else {
|
||||
return;
|
||||
};
|
||||
|
||||
match current_flow {
|
||||
FlowData::InboundWireGuard(flow) => self
|
||||
.inner
|
||||
.insert_inbound_wireguard_flow(flow, self.created_at),
|
||||
FlowData::InboundTun(flow) => self.inner.insert_inbound_tun_flow(flow, self.created_at),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum FlowData {
|
||||
InboundWireGuard(InboundWireGuard),
|
||||
InboundTun(InboundTun),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct InboundWireGuard {
|
||||
outer: OuterFlow<SocketAddr>,
|
||||
inner: Option<InnerFlow>,
|
||||
client: Option<ClientId>,
|
||||
resource: Option<ResourceId>,
|
||||
icmp_error: Option<IcmpError>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct InboundTun {
|
||||
inner: InnerFlow,
|
||||
outer: Option<OuterFlow<Option<SocketAddr>>>,
|
||||
client: Option<ClientId>,
|
||||
resource: Option<ResourceId>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct OuterFlow<L> {
|
||||
local: L,
|
||||
remote: SocketAddr,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct InnerFlow {
|
||||
src_ip: IpAddr,
|
||||
dst_ip: IpAddr,
|
||||
src_proto: Result<Protocol, UnsupportedProtocol>,
|
||||
dst_proto: Result<Protocol, UnsupportedProtocol>,
|
||||
|
||||
tcp_syn: bool,
|
||||
tcp_fin: bool,
|
||||
tcp_rst: bool,
|
||||
|
||||
payload_len: usize,
|
||||
}
|
||||
|
||||
impl From<&IpPacket> for InnerFlow {
|
||||
fn from(packet: &IpPacket) -> Self {
|
||||
InnerFlow {
|
||||
src_ip: packet.source(),
|
||||
dst_ip: packet.destination(),
|
||||
src_proto: packet.source_protocol(),
|
||||
dst_proto: packet.destination_protocol(),
|
||||
tcp_syn: packet.as_tcp().map(|tcp| tcp.syn()).unwrap_or(false),
|
||||
tcp_fin: packet.as_tcp().map(|tcp| tcp.fin()).unwrap_or(false),
|
||||
tcp_rst: packet.as_tcp().map(|tcp| tcp.rst()).unwrap_or(false),
|
||||
payload_len: packet.layer4_payload_len(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::net::Ipv4Addr;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn flow_context_diff_rendering() {
|
||||
let old = FlowContext {
|
||||
src_ip: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
|
||||
dst_ip: IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)),
|
||||
src_port: 8080,
|
||||
dst_port: 443,
|
||||
};
|
||||
let new = FlowContext {
|
||||
src_ip: IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)),
|
||||
dst_ip: IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)),
|
||||
src_port: 50000,
|
||||
dst_port: 443,
|
||||
};
|
||||
|
||||
let diff = FlowContextDiff::new(old, new);
|
||||
|
||||
assert_eq!(
|
||||
"FlowContextDiff { old_src_ip: 10.0.0.1, new_src_ip: 1.1.1.1, old_src_port: 8080, new_src_port: 50000 }",
|
||||
format!("{diff:?}")
|
||||
);
|
||||
}
|
||||
}
|
||||
37
scripts/tests/download-concurrent.sh
Executable file
37
scripts/tests/download-concurrent.sh
Executable file
@@ -0,0 +1,37 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
source "./scripts/tests/lib.sh"
|
||||
|
||||
client sh -c "curl --fail --max-time 10 --output /tmp/download1.file http://download.httpbin/bytes?num=5000000" &
|
||||
PID1=$!
|
||||
|
||||
client sh -c "curl --fail --max-time 10 --output /tmp/download2.file http://download.httpbin/bytes?num=5000000" &
|
||||
PID2=$!
|
||||
|
||||
client sh -c "curl --fail --max-time 10 --output /tmp/download3.file http://download.httpbin/bytes?num=5000000" &
|
||||
PID3=$!
|
||||
|
||||
wait $PID1 || {
|
||||
echo "Download 1 failed"
|
||||
exit 1
|
||||
}
|
||||
|
||||
wait $PID2 || {
|
||||
echo "Download 2 failed"
|
||||
exit 1
|
||||
}
|
||||
|
||||
wait $PID3 || {
|
||||
echo "Download 3 failed"
|
||||
exit 1
|
||||
}
|
||||
|
||||
sleep 3
|
||||
readarray -t flows < <(get_flow_logs "tcp")
|
||||
|
||||
assert_eq "${#flows[@]}" 3
|
||||
|
||||
for flow in "${flows[@]}"; do
|
||||
assert_eq "$(get_flow_field "$flow" "inner_dst_ip")" "172.21.0.101"
|
||||
assert_gteq "$(get_flow_field "$flow" "rx_bytes")" 5000000
|
||||
done
|
||||
@@ -43,3 +43,24 @@ if [[ "$computed_checksum" != "$known_checksum" ]]; then
|
||||
echo "Checksum of downloaded file does not match"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
sleep 3
|
||||
readarray -t flows < <(get_flow_logs "tcp")
|
||||
|
||||
assert_gteq "${#flows[@]}" 2
|
||||
|
||||
# All flows should have same inner_dst_ip
|
||||
for flow in "${flows[@]}"; do
|
||||
assert_eq "$(get_flow_field "$flow" "inner_dst_ip")" "172.21.0.101"
|
||||
done
|
||||
|
||||
# Verify different outer_src_port after roaming (network change)
|
||||
# The docker-compose setup uses routers and therefore the source IP is always the router.
|
||||
# But conntrack on the router will allocate a new source port because the binding on the old one is still active after roaming.
|
||||
original_src_port=$(get_flow_field "${flows[0]}" "outer_src_port")
|
||||
|
||||
for ((i = 1; i < ${#flows[@]}; i++)); do
|
||||
next_src_port=$(get_flow_field "${flows[i]}" "outer_src_port")
|
||||
|
||||
assert_ne "$original_src_port" "$next_src_port"
|
||||
done
|
||||
|
||||
24
scripts/tests/download-rst.sh
Executable file
24
scripts/tests/download-rst.sh
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
source "./scripts/tests/lib.sh"
|
||||
|
||||
# 2 seconds are not enough at the given speed to download the file, curl will therefore abort and RST the connection.
|
||||
client sh -c "curl --max-time 2 --limit-rate 1000000 --no-keepalive --parallel-max 1 --output /dev/null http://download.httpbin/bytes?num=100000000" &
|
||||
DOWNLOAD_PID=$!
|
||||
|
||||
wait $DOWNLOAD_PID || true # The download fails but we want to continue.
|
||||
|
||||
sleep 3
|
||||
readarray -t flows < <(get_flow_logs "tcp")
|
||||
|
||||
assert_gteq "${#flows[@]}" 1
|
||||
|
||||
rx_bytes=0
|
||||
|
||||
# All flows should have same inner_dst_ip
|
||||
for flow in "${flows[@]}"; do
|
||||
assert_eq "$(get_flow_field "$flow" "inner_dst_ip")" "172.21.0.101"
|
||||
rx_bytes+="$(get_flow_field "$flow" "rx_bytes")"
|
||||
done
|
||||
|
||||
assert_gteq "$rx_bytes" 2000000
|
||||
@@ -18,3 +18,12 @@ if [[ "$computed_checksum" != "$known_checksum" ]]; then
|
||||
echo "Checksum of downloaded file does not match"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
sleep 3
|
||||
readarray -t flows < <(get_flow_logs "tcp")
|
||||
|
||||
assert_eq "${#flows[@]}" 1
|
||||
|
||||
flow="${flows[0]}"
|
||||
assert_eq "$(get_flow_field "$flow" "inner_dst_ip")" "172.21.0.101"
|
||||
assert_gteq "$(get_flow_field "$flow" "rx_bytes")" 10000000
|
||||
|
||||
@@ -50,7 +50,7 @@ Domain.PubSub.Account.broadcast(account_id, {{:reject_access, gateway_id}, clien
|
||||
"
|
||||
}
|
||||
|
||||
function assert_equals() {
|
||||
function assert_eq() {
|
||||
local actual="$1"
|
||||
local expected="$2"
|
||||
|
||||
@@ -60,6 +60,26 @@ function assert_equals() {
|
||||
fi
|
||||
}
|
||||
|
||||
function assert_ne() {
|
||||
local actual="$1"
|
||||
local expected="$2"
|
||||
|
||||
if [[ "$expected" == "$actual" ]]; then
|
||||
echo "Expected values to differ but both are $actual"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
function assert_gteq() {
|
||||
local actual="$1"
|
||||
local expected="$2"
|
||||
|
||||
if [ "$actual" -lt "$expected" ]; then
|
||||
echo "Expected $actual to be greater than or equal to $expected"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
function process_state() {
|
||||
local container="$1"
|
||||
|
||||
@@ -70,7 +90,7 @@ function assert_process_state {
|
||||
local container="$1"
|
||||
local expected_state="$2"
|
||||
|
||||
assert_equals "$(process_state "$container")" "$expected_state"
|
||||
assert_eq "$(process_state "$container")" "$expected_state"
|
||||
}
|
||||
|
||||
function create_token_file {
|
||||
@@ -96,3 +116,23 @@ function expect_error() {
|
||||
return 0
|
||||
fi
|
||||
}
|
||||
|
||||
# Extract flow logs from gateway for a given protocol
|
||||
# Returns flow log lines (use with readarray)
|
||||
# Usage: readarray -t flows < <(get_flow_logs "tcp")
|
||||
function get_flow_logs() {
|
||||
local protocol="$1"
|
||||
|
||||
docker compose logs gateway --since 30s 2>/dev/null |
|
||||
grep "flow_logs::${protocol}.*flow completed" || true
|
||||
}
|
||||
|
||||
# Extract a field value from a flow log line
|
||||
# Usage: get_flow_field <flow_log_line> <field_name>
|
||||
# Example: get_flow_field "$flow" "inner_dst_ip"
|
||||
function get_flow_field() {
|
||||
local flow_log="$1"
|
||||
local field_name="$2"
|
||||
|
||||
echo "$flow_log" | grep -oP "${field_name}=\K[^ ]+" || echo ""
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user