diff --git a/.github/workflows/_integration_tests.yml b/.github/workflows/_integration_tests.yml index 2cbbec9b1..c8ba444ed 100644 --- a/.github/workflows/_integration_tests.yml +++ b/.github/workflows/_integration_tests.yml @@ -103,6 +103,8 @@ jobs: test: - script: create-flow-from-icmp-error min_client_version: 1.5.4 + - script: download-rst + min_gateway_version: 1.4.18 - script: curl-api-down - script: curl-api-restart - script: curl-ecn @@ -113,31 +115,43 @@ jobs: - name: dns-systemd-resolved script: systemd/dns-systemd-resolved - script: tcp-dns - # Setting both client and gateway to random masquerade will force relay-relay candidate pair + - script: download-concurrent + min_gateway_version: 1.4.18 - name: download-double-symmetric-nat script: download + # Setting both client and gateway to random masquerade will force relay-relay candidate pair client_masquerade: random gateway_masquerade: random - rust_log: debug + rust_log: debug,flow_logs=trace single_relay: true # Force single relay + min_gateway_version: 1.4.18 - script: download-packet-loss rust_log: debug - script: download-roaming-network - # Too noisy can cause flaky tests due to the amount of data - rust_log: debug + min_gateway_version: 1.4.18 + rust_log: debug,flow_logs=trace # Too noisy can cause flaky tests due to the amount of data steps: - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: ./.github/actions/ghcr-docker-login with: github_token: ${{ secrets.GITHUB_TOKEN }} - name: Check minimum client version - id: version_check + id: client_version_check if: ${{ matrix.test.min_client_version }} continue-on-error: true run: | ACTUAL_VERSION=$(docker run ${{ inputs.client_image }}:${{ inputs.client_tag }} firezone-headless-client --version | awk '{print $2}') MIN_VERSION="${{ matrix.test.min_client_version }}" + [ "$(printf '%s\n' "$MIN_VERSION" "$ACTUAL_VERSION" | sort --version-sort | head -n1)" == "$MIN_VERSION" ] + - name: Check minimum gateway version + id: gateway_version_check + if: ${{ matrix.test.min_gateway_version }} + continue-on-error: true + run: | + ACTUAL_VERSION=$(docker run ${{ inputs.gateway_image }}:${{ inputs.gateway_tag }} firezone-gateway --version | awk '{print $2}') + MIN_VERSION="${{ matrix.test.min_gateway_version }}" + [ "$(printf '%s\n' "$MIN_VERSION" "$ACTUAL_VERSION" | sort --version-sort | head -n1)" == "$MIN_VERSION" ] # We need at least Docker v28.1 which is not yet available on GitHub actions runners - uses: docker/setup-docker-action@b60f85385d03ac8acfca6d9996982511d8620a19 # v4.3.0 @@ -189,7 +203,7 @@ jobs: sudo ethtool -K docker0 tx off - run: ./scripts/tests/${{ matrix.test.script }}.sh - if: ${{ steps.version_check.outcome != 'failure' }} # Run the script if version check succeeds or is skipped + if: ${{ steps.client_version_check.outcome != 'failure' && steps.gateway_version_check.outcome != 'failure' }} # Run the script if version checks succeed or are skipped - name: Ensure Client emitted no warnings if: "!cancelled()" diff --git a/docker-compose.yml b/docker-compose.yml index 5585c8a56..e3fd3b19c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -242,7 +242,7 @@ services: <<: *health-check environment: FIREZONE_TOKEN: ".SFMyNTY.g2gDaANtAAAAJGM4OWJjYzhjLTkzOTItNGRhZS1hNDBkLTg4OGFlZjZkMjhlMG0AAAAkMjI3NDU2MGItZTk3Yi00NWU0LThiMzQtNjc5Yzc2MTdlOThkbQAAADhPMDJMN1VTMkozVklOT01QUjlKNklMODhRSVFQNlVPOEFRVk82VTVJUEwwVkpDMjJKR0gwPT09PW4GAAH8sImUAWIAAVGA.tAm2O9FcyF67VAF3rZdwQpeADrYOIs3S2l2K51G26OM" - RUST_LOG: ${RUST_LOG:-wire=trace,debug} + RUST_LOG: ${RUST_LOG:-wire=trace,debug,flow_logs=trace} FIREZONE_API_URL: ws://api:8081 FIREZONE_ID: 4694E56C-7643-4A15-9DF3-638E5B05F570 command: diff --git a/rust/connlib/ip-packet/src/lib.rs b/rust/connlib/ip-packet/src/lib.rs index 047c43d2b..88988196d 100644 --- a/rust/connlib/ip-packet/src/lib.rs +++ b/rust/connlib/ip-packet/src/lib.rs @@ -354,6 +354,26 @@ impl IpPacket { )) } + pub fn layer4_payload_len(&self) -> usize { + if let Some(tcp) = self.as_tcp() { + return tcp.payload().len(); + } + + if let Some(udp) = self.as_udp() { + return udp.payload().len(); + } + + if let Some(icmpv4) = self.as_icmpv4() { + return icmpv4.payload().len(); + } + + if let Some(icmpv6) = self.as_icmpv6() { + return icmpv6.payload().len(); + } + + self.payload().len() + } + pub fn set_source_protocol(&mut self, v: u16) { if let Some(mut p) = self.as_tcp_mut() { p.set_source_port(v); diff --git a/rust/connlib/tunnel/Cargo.toml b/rust/connlib/tunnel/Cargo.toml index 74ed0a2bd..6dbb17ead 100644 --- a/rust/connlib/tunnel/Cargo.toml +++ b/rust/connlib/tunnel/Cargo.toml @@ -17,7 +17,7 @@ bufferpool = { workspace = true } bytes = { workspace = true, features = ["std"] } chrono = { workspace = true } connlib-model = { workspace = true } -derive_more = { workspace = true, features = ["debug"] } +derive_more = { workspace = true, features = ["debug", "from"] } divan = { workspace = true, optional = true } dns-over-tcp = { workspace = true } dns-types = { workspace = true } diff --git a/rust/connlib/tunnel/src/gateway.rs b/rust/connlib/tunnel/src/gateway.rs index 60f3f2e2f..333a57abb 100644 --- a/rust/connlib/tunnel/src/gateway.rs +++ b/rust/connlib/tunnel/src/gateway.rs @@ -1,10 +1,12 @@ mod client_on_gateway; mod filter_engine; +mod flow_tracker; mod nat_table; pub(crate) use crate::gateway::client_on_gateway::ClientOnGateway; use crate::gateway::client_on_gateway::TranslateOutboundResult; +use crate::gateway::flow_tracker::FlowTracker; use crate::messages::gateway::ResourceDescription; use crate::messages::{Answer, IceCredentials, ResolveRequest, SecretKey}; use crate::peer_store::PeerStore; @@ -38,6 +40,8 @@ pub struct GatewayState { /// All clients we are connected to and the associated, connection-specific state. peers: PeerStore, + flow_tracker: FlowTracker, + /// When to next check whether a resource-access policy has expired. next_expiry_resources_check: Option, @@ -72,6 +76,7 @@ impl GatewayState { next_expiry_resources_check: Default::default(), buffered_events: VecDeque::default(), buffered_transmits: VecDeque::default(), + flow_tracker: FlowTracker::new(now), tun_ip_config: None, } } @@ -98,6 +103,8 @@ impl GatewayState { packet: IpPacket, now: Instant, ) -> Result> { + let _guard = self.flow_tracker.new_inbound_tun(&packet, now); + if packet.is_fz_p2p_control() { tracing::warn!("Packet matches heuristics of FZ p2p control protocol"); } @@ -114,6 +121,8 @@ impl GatewayState { }; let cid = peer.id(); + flow_tracker::inbound_tun::record_client(cid); + let Some(packet) = peer .translate_inbound(packet, now) .context("Failed to translate inbound packet")? @@ -129,6 +138,11 @@ impl GatewayState { return Ok(None); }; + flow_tracker::inbound_tun::record_wireguard_packet( + encrypted_packet.src, + encrypted_packet.dst, + ); + Ok(Some(encrypted_packet)) } @@ -145,6 +159,8 @@ impl GatewayState { packet: &[u8], now: Instant, ) -> Result> { + let _guard = self.flow_tracker.new_inbound_wireguard(local, from, now); + let Some((cid, packet)) = self .node .decapsulate(local, from, packet, now) @@ -153,6 +169,9 @@ impl GatewayState { return Ok(None); }; + flow_tracker::inbound_wg::record_client(cid); + flow_tracker::inbound_wg::record_decrypted_packet(&packet); + let peer = self .peers .get_mut(&cid) @@ -194,9 +213,15 @@ impl GatewayState { .translate_outbound(packet, now) .context("Failed to translate outbound packet")? { - TranslateOutboundResult::Send(ip_packet) => Ok(Some(ip_packet)), + TranslateOutboundResult::Send(packet) => { + flow_tracker::inbound_wg::record_translated_packet(&packet); + + Ok(Some(packet)) + } TranslateOutboundResult::DestinationUnreachable(reply) | TranslateOutboundResult::Filtered(reply) => { + flow_tracker::inbound_wg::record_icmp_error(&reply); + let Some(transmit) = encrypt_packet(reply, cid, &mut self.node, now)? else { return Ok(None); }; @@ -412,6 +437,7 @@ impl GatewayState { pub fn handle_timeout(&mut self, now: Instant, utc_now: DateTime) { self.node.handle_timeout(now); self.drain_node_events(); + self.flow_tracker.handle_timeout(now); match self.next_expiry_resources_check { Some(next_expiry_resources_check) if now >= next_expiry_resources_check => { @@ -432,6 +458,65 @@ impl GatewayState { None => self.next_expiry_resources_check = Some(now + EXPIRE_RESOURCES_INTERVAL), Some(_) => {} } + + while let Some(flow) = self.flow_tracker.poll_completed_flow() { + match flow { + flow_tracker::CompletedFlow::Tcp(flow) => { + tracing::trace!( + target: "flow_logs::tcp", + + client = %flow.client, + resource = %flow.resource, + start = ?flow.start, + end = ?flow.end, + last_packet = ?flow.last_packet, + + inner_src_ip = %flow.inner_src_ip, + inner_dst_ip = %flow.inner_dst_ip, + inner_src_port = %flow.inner_src_port, + inner_dst_port = %flow.inner_dst_port, + + outer_src_ip = %flow.outer_src_ip, + outer_dst_ip = %flow.outer_dst_ip, + outer_src_port = %flow.outer_src_port, + outer_dst_port = %flow.outer_dst_port, + + rx_packets = %flow.rx_packets, + tx_packets = %flow.tx_packets, + rx_bytes = %flow.rx_bytes, + tx_bytes = %flow.tx_bytes, + "TCP flow completed" + ); + } + flow_tracker::CompletedFlow::Udp(flow) => { + tracing::trace!( + target: "flow_logs::udp", + + client = %flow.client, + resource = %flow.resource, + start = ?flow.start, + end = ?flow.end, + last_packet = ?flow.last_packet, + + inner_src_ip = %flow.inner_src_ip, + inner_dst_ip = %flow.inner_dst_ip, + inner_src_port = %flow.inner_src_port, + inner_dst_port = %flow.inner_dst_port, + + outer_src_ip = %flow.outer_src_ip, + outer_dst_ip = %flow.outer_dst_ip, + outer_src_port = %flow.outer_src_port, + outer_dst_port = %flow.outer_dst_port, + + rx_packets = %flow.rx_packets, + tx_packets = %flow.tx_packets, + rx_bytes = %flow.rx_bytes, + tx_bytes = %flow.tx_bytes, + "UDP flow completed" + ); + } + } + } } fn drain_node_events(&mut self) { diff --git a/rust/connlib/tunnel/src/gateway/client_on_gateway.rs b/rust/connlib/tunnel/src/gateway/client_on_gateway.rs index c5943b5d9..4a3a32e64 100644 --- a/rust/connlib/tunnel/src/gateway/client_on_gateway.rs +++ b/rust/connlib/tunnel/src/gateway/client_on_gateway.rs @@ -13,6 +13,7 @@ use ip_packet::{IpPacket, Protocol, UnsupportedProtocol}; use crate::client::{IPV4_RESOURCES, IPV6_RESOURCES}; use crate::gateway::filter_engine::FilterEngine; +use crate::gateway::flow_tracker; use crate::gateway::nat_table::{NatTable, TranslateIncomingResult}; use crate::messages::gateway::Filters; use crate::messages::gateway::ResourceDescription; @@ -319,7 +320,7 @@ impl ClientOnGateway { now: Instant, ) -> anyhow::Result { // Filtering a packet is not an error. - if let Err(e) = self.ensure_allowed_src_and_dst(&packet) { + if let Err(e) = self.ensure_allowed_outbound(&packet) { tracing::debug!(filtered_packet = ?packet, "{e:#}"); return Ok(TranslateOutboundResult::Filtered( ip_packet::make::icmp_dest_unreachable_prohibited(&packet)?, @@ -355,21 +356,24 @@ impl ClientOnGateway { return Ok(Some(packet)); } - if let Err(e) = self.classify_resource(packet.source(), packet.source_protocol()) { - tracing::debug!( - "Inbound packet is not allowed, perhaps from an old client session? error = {e:#}" - ); - - self.num_dropped_packets.add( - 1, - &[ - otel::attr::network_type_for_packet(&packet), - otel::attr::network_io_direction_receive(), - otel::attr::error_type(e.root_cause().to_string()), - ], - ); - - return Ok(None); + match self.classify_resource(packet.source(), packet.source_protocol()) { + Ok(rid) => { + flow_tracker::inbound_tun::record_resource(rid); + } + Err(e) => { + tracing::debug!( + "Inbound packet is not allowed, perhaps from an old client session? error = {e:#}" + ); + self.num_dropped_packets.add( + 1, + &[ + otel::attr::network_type_for_packet(&packet), + otel::attr::network_io_direction_receive(), + otel::attr::error_type(e.root_cause().to_string()), + ], + ); + return Ok(None); + } } Ok(Some(packet)) @@ -476,7 +480,7 @@ impl ClientOnGateway { self.resources.contains_key(&resource) } - fn ensure_allowed_src_and_dst(&self, packet: &IpPacket) -> anyhow::Result<()> { + fn ensure_allowed_outbound(&self, packet: &IpPacket) -> anyhow::Result<()> { self.ensure_client_ip(packet.source())?; // Traffic to our own IP is allowed. @@ -484,7 +488,9 @@ impl ClientOnGateway { return Ok(()); } - self.classify_resource(packet.destination(), packet.destination_protocol())?; + let rid = self.classify_resource(packet.destination(), packet.destination_protocol())?; + + flow_tracker::inbound_wg::record_resource(rid); Ok(()) } diff --git a/rust/connlib/tunnel/src/gateway/flow_tracker.rs b/rust/connlib/tunnel/src/gateway/flow_tracker.rs new file mode 100644 index 000000000..a2474fb9e --- /dev/null +++ b/rust/connlib/tunnel/src/gateway/flow_tracker.rs @@ -0,0 +1,839 @@ +use std::{ + cell::RefCell, + collections::{HashMap, VecDeque, hash_map}, + net::{IpAddr, SocketAddr}, + sync::LazyLock, +}; + +use chrono::{DateTime, TimeDelta, Utc}; +use connlib_model::{ClientId, ResourceId}; +use ip_packet::{IcmpError, IpPacket, Protocol, UnsupportedProtocol}; +use std::time::Instant; + +thread_local! { + static CURRENT_FLOW: RefCell> = const { RefCell::new(None) }; +} + +// Workaround because `tracing::enabled!` is broken. +static IS_ENABLED: LazyLock = LazyLock::new(|| { + std::env::var("RUST_LOG").is_ok_and(|directives| directives.contains("flow_logs=trace")) +}); + +const FLOW_TIMEOUT: TimeDelta = TimeDelta::minutes(2); + +#[derive(Debug)] +pub struct FlowTracker { + active_tcp_flows: HashMap, + active_udp_flows: HashMap, + + completed_flows: VecDeque, + + created_at: Instant, + created_at_utc: DateTime, +} + +impl FlowTracker { + pub fn new(now: Instant) -> Self { + Self { + active_tcp_flows: Default::default(), + active_udp_flows: Default::default(), + completed_flows: Default::default(), + created_at: now, + created_at_utc: Utc::now(), + } + } + + pub fn new_inbound_tun<'a>( + &'a mut self, + packet: &IpPacket, + now: Instant, + ) -> CurrentFlowGuard<'a> { + if !*IS_ENABLED { + return CurrentFlowGuard { + inner: self, + created_at: now, + }; + } + + let current = CURRENT_FLOW.replace(Some(FlowData::InboundTun(InboundTun { + inner: InnerFlow::from(packet), + outer: None, + client: None, + resource: None, + }))); + debug_assert!( + current.is_none(), + "at most 1 flow should be active at any time" + ); + + CurrentFlowGuard { + inner: self, + created_at: now, + } + } + + pub fn new_inbound_wireguard<'a>( + &'a mut self, + local: SocketAddr, + remote: SocketAddr, + now: Instant, + ) -> CurrentFlowGuard<'a> { + if !*IS_ENABLED { + return CurrentFlowGuard { + inner: self, + created_at: now, + }; + } + + let current = CURRENT_FLOW.replace(Some(FlowData::InboundWireGuard(InboundWireGuard { + outer: OuterFlow { local, remote }, + inner: None, + client: None, + resource: None, + icmp_error: None, + }))); + debug_assert!( + current.is_none(), + "at most 1 flow should be active at any time" + ); + + CurrentFlowGuard { + inner: self, + created_at: now, + } + } + + pub fn poll_completed_flow(&mut self) -> Option { + self.completed_flows.pop_front() + } + + pub fn handle_timeout(&mut self, now: Instant) { + let now_utc = self.now_utc(now); + + for (key, value) in self + .active_tcp_flows + .extract_if(|_, value| now_utc.signed_duration_since(value.last_packet) > FLOW_TIMEOUT) + { + let flow = CompletedTcpFlow::new(key, value, now_utc); + + tracing::debug!(?flow, "Terminating TCP flow; timeout"); + + self.completed_flows.push_back(flow.into()); + } + + for (key, value) in self + .active_udp_flows + .extract_if(|_, value| now_utc.signed_duration_since(value.last_packet) > FLOW_TIMEOUT) + { + let flow = CompletedUdpFlow::new(key, value, now_utc); + + tracing::debug!(?flow, "Terminating UDP flow; timeout"); + + self.completed_flows.push_back(flow.into()); + } + + for (key, value) in self + .active_tcp_flows + .extract_if(|_, value| value.fin_rx && value.fin_tx) + { + let end = value.last_packet; + let flow = CompletedTcpFlow::new(key, value, end); + + tracing::debug!(?flow, "Terminating TCP flow; FIN sent & received"); + + self.completed_flows.push_back(flow.into()); + } + } + + fn insert_inbound_wireguard_flow(&mut self, flow: InboundWireGuard, now: Instant) { + let InboundWireGuard { + outer, + inner: + Some(InnerFlow { + src_ip, + dst_ip, + src_proto: Ok(src_proto), + dst_proto: Ok(dst_proto), + tcp_syn, + tcp_fin, + tcp_rst, + payload_len, + }), + client: Some(client), + resource: Some(resource), + icmp_error: _, // TODO: What to do with ICMP errors? + } = flow + else { + tracing::trace!(?flow, "Cannot create flow with missing data"); + + return; + }; + let now_utc = self.now_utc(now); + let context = FlowContext { + src_ip: outer.remote.ip(), + dst_ip: outer.local.ip(), + src_port: outer.remote.port(), + dst_port: outer.local.port(), + }; + + match (src_proto, dst_proto) { + (Protocol::Tcp(src_port), Protocol::Tcp(dst_port)) => { + let key = TcpFlowKey { + client, + resource, + src_ip, + dst_ip, + src_port, + dst_port, + }; + + match self.active_tcp_flows.entry(key) { + hash_map::Entry::Vacant(vacant) => { + if tcp_fin || tcp_rst { + // Don't create new flows for FIN/RST packets. + return; + } + + tracing::debug!(key = ?vacant.key(), ?context, syn = %tcp_syn, "Creating new TCP flow"); + + vacant.insert(TcpFlowValue { + start: now_utc, + last_packet: now_utc, + stats: FlowStats::default().with_tx(payload_len as u64), + context, + fin_tx: false, + fin_rx: false, + }); + } + hash_map::Entry::Occupied(occupied) if occupied.get().context != context => { + let (key, value) = occupied.remove_entry(); + let context_diff = FlowContextDiff::new(value.context, context); + + tracing::debug!( + ?key, + ?context_diff, + "Splitting existing TCP flow; context changed" + ); + + let flow = CompletedTcpFlow::new(key, value, now_utc); + + self.completed_flows.push_back(flow.into()); + + self.active_tcp_flows.insert( + key, + TcpFlowValue { + start: now_utc, + last_packet: now_utc, + stats: FlowStats::default().with_tx(payload_len as u64), + context, + fin_tx: false, + fin_rx: false, + }, + ); + } + hash_map::Entry::Occupied(occupied) if tcp_syn => { + let (key, value) = occupied.remove_entry(); + + tracing::debug!(?key, "Splitting existing TCP flow; new TCP SYN"); + + let flow = CompletedTcpFlow::new(key, value, now_utc); + + self.completed_flows.push_back(flow.into()); + + self.active_tcp_flows.insert( + key, + TcpFlowValue { + start: now_utc, + last_packet: now_utc, + stats: FlowStats::default().with_tx(payload_len as u64), + context, + fin_tx: false, + fin_rx: false, + }, + ); + } + hash_map::Entry::Occupied(mut occupied) => { + let value = occupied.get_mut(); + + value.stats.inc_tx(payload_len as u64); + value.last_packet = now_utc; + if tcp_fin { + value.fin_tx = true; + } + + if tcp_rst { + let (key, value) = occupied.remove_entry(); + let flow = CompletedTcpFlow::new(key, value, now_utc); + + tracing::debug!(?flow, "TCP flow completed on outbound RST"); + + self.completed_flows.push_back(flow.into()); + } + } + }; + } + (Protocol::Udp(src_port), Protocol::Udp(dst_port)) => { + let key = UdpFlowKey { + client, + resource, + src_ip, + dst_ip, + src_port, + dst_port, + }; + + match self.active_udp_flows.entry(key) { + hash_map::Entry::Vacant(vacant) => { + tracing::debug!(key = ?vacant.key(), "Creating new UDP flow"); + + vacant.insert(UdpFlowValue { + start: now_utc, + last_packet: now_utc, + stats: FlowStats::default().with_tx(payload_len as u64), + context, + }); + } + hash_map::Entry::Occupied(occupied) if occupied.get().context != context => { + let (key, value) = occupied.remove_entry(); + let context_diff = FlowContextDiff::new(value.context, context); + + let flow = CompletedUdpFlow::new(key, value, now_utc); + + tracing::debug!( + ?key, + ?context_diff, + "Splitting existing UDP flow; context changed" + ); + + self.completed_flows.push_back(flow.into()); + + self.active_udp_flows.insert( + key, + UdpFlowValue { + start: now_utc, + last_packet: now_utc, + stats: FlowStats::default().with_tx(payload_len as u64), + context, + }, + ); + } + hash_map::Entry::Occupied(mut occupied) => { + let value = occupied.get_mut(); + + value.stats.inc_tx(payload_len as u64); + value.last_packet = now_utc; + } + }; + } + (Protocol::Icmp(_), Protocol::Icmp(_)) => {} + _ => { + tracing::error!("src and dst protocol must be the same"); + } + } + } + + fn insert_inbound_tun_flow(&mut self, flow: InboundTun, now: Instant) { + let InboundTun { + inner: + InnerFlow { + src_ip, + dst_ip, + src_proto: Ok(src_proto), + dst_proto: Ok(dst_proto), + tcp_syn: _, + tcp_fin, + tcp_rst, + payload_len, + }, + outer: Some(_), + client: Some(client), + resource: Some(resource), + } = flow + else { + tracing::trace!(?flow, "Cannot create flow with missing data"); + + return; + }; + let now_utc = self.now_utc(now); + + match (src_proto, dst_proto) { + (Protocol::Tcp(src_port), Protocol::Tcp(dst_port)) => { + // For packets inbound from the TUN device, we need to flip src & dst. + let key = TcpFlowKey { + client, + resource, + src_ip: dst_ip, + dst_ip: src_ip, + src_port: dst_port, + dst_port: src_port, + }; + + match self.active_tcp_flows.entry(key) { + hash_map::Entry::Vacant(vacant) => { + if tcp_fin || tcp_rst { + // Don't care about FIN/RST packets where the flow no longer exists. + return; + } + + tracing::debug!(key = ?vacant.key(), "No existing TCP flow for packet inbound on TUN device"); + } + hash_map::Entry::Occupied(mut occupied) => { + let value = occupied.get_mut(); + value.stats.inc_rx(payload_len as u64); + value.last_packet = now_utc; + + if tcp_fin { + value.fin_rx = true; + } + + if tcp_rst { + let (key, value) = occupied.remove_entry(); + let flow = CompletedTcpFlow::new(key, value, now_utc); + + tracing::debug!(?flow, "TCP flow completed on inbound RST"); + + self.completed_flows.push_back(flow.into()); + } + } + }; + } + (Protocol::Udp(src_port), Protocol::Udp(dst_port)) => { + // For packets inbound from the TUN device, we need to flip src & dst. + let key = UdpFlowKey { + client, + resource, + src_ip: dst_ip, + dst_ip: src_ip, + src_port: dst_port, + dst_port: src_port, + }; + + match self.active_udp_flows.entry(key) { + hash_map::Entry::Vacant(vacant) => { + tracing::debug!(key = ?vacant.key(), "No existing UDP flow for packet inbound on TUN device"); + } + hash_map::Entry::Occupied(mut occupied) => { + let value = occupied.get_mut(); + value.stats.inc_rx(payload_len as u64); + value.last_packet = now_utc; + } + }; + } + (Protocol::Icmp(_), Protocol::Icmp(_)) => {} + _ => { + tracing::error!("src and dst protocol must be the same"); + } + } + } + + fn now_utc(&self, now: Instant) -> DateTime { + self.created_at_utc + now.duration_since(self.created_at) + } +} + +#[derive(Debug, derive_more::From)] +pub enum CompletedFlow { + Tcp(CompletedTcpFlow), + Udp(CompletedUdpFlow), +} + +#[derive(Debug)] +pub struct CompletedTcpFlow { + pub client: ClientId, + pub resource: ResourceId, + pub start: DateTime, + pub end: DateTime, + pub last_packet: DateTime, + + pub inner_src_ip: IpAddr, + pub inner_dst_ip: IpAddr, + pub inner_src_port: u16, + pub inner_dst_port: u16, + + pub outer_src_ip: IpAddr, + pub outer_dst_ip: IpAddr, + pub outer_src_port: u16, + pub outer_dst_port: u16, + + pub rx_packets: u64, + pub tx_packets: u64, + pub rx_bytes: u64, + pub tx_bytes: u64, +} + +#[derive(Debug)] +pub struct CompletedUdpFlow { + pub client: ClientId, + pub resource: ResourceId, + pub start: DateTime, + pub end: DateTime, + pub last_packet: DateTime, + + pub inner_src_ip: IpAddr, + pub inner_dst_ip: IpAddr, + pub inner_src_port: u16, + pub inner_dst_port: u16, + + pub outer_src_ip: IpAddr, + pub outer_dst_ip: IpAddr, + pub outer_src_port: u16, + pub outer_dst_port: u16, + + pub rx_packets: u64, + pub tx_packets: u64, + pub rx_bytes: u64, + pub tx_bytes: u64, +} + +impl CompletedTcpFlow { + fn new(key: TcpFlowKey, value: TcpFlowValue, end: DateTime) -> Self { + Self { + client: key.client, + resource: key.resource, + start: value.start, + end, + last_packet: value.last_packet, + inner_src_ip: key.src_ip, + inner_dst_ip: key.dst_ip, + inner_src_port: key.src_port, + inner_dst_port: key.dst_port, + outer_src_ip: value.context.src_ip, + outer_dst_ip: value.context.dst_ip, + outer_src_port: value.context.src_port, + outer_dst_port: value.context.dst_port, + rx_packets: value.stats.rx_packets, + tx_packets: value.stats.tx_packets, + rx_bytes: value.stats.rx_bytes, + tx_bytes: value.stats.tx_bytes, + } + } +} + +impl CompletedUdpFlow { + fn new(key: UdpFlowKey, value: UdpFlowValue, end: DateTime) -> Self { + Self { + client: key.client, + resource: key.resource, + start: value.start, + end, + last_packet: value.last_packet, + inner_src_ip: key.src_ip, + inner_dst_ip: key.dst_ip, + inner_src_port: key.src_port, + inner_dst_port: key.dst_port, + outer_src_ip: value.context.src_ip, + outer_dst_ip: value.context.dst_ip, + outer_src_port: value.context.src_port, + outer_dst_port: value.context.dst_port, + rx_packets: value.stats.rx_packets, + tx_packets: value.stats.tx_packets, + rx_bytes: value.stats.rx_bytes, + tx_bytes: value.stats.tx_bytes, + } + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +struct TcpFlowKey { + client: ClientId, + resource: ResourceId, + src_ip: IpAddr, + dst_ip: IpAddr, + src_port: u16, + dst_port: u16, +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +struct UdpFlowKey { + client: ClientId, + resource: ResourceId, + src_ip: IpAddr, + dst_ip: IpAddr, + src_port: u16, + dst_port: u16, +} + +#[derive(Debug)] +struct TcpFlowValue { + start: DateTime, + last_packet: DateTime, + stats: FlowStats, + context: FlowContext, + + fin_tx: bool, + fin_rx: bool, +} + +#[derive(Debug)] +struct UdpFlowValue { + start: DateTime, + last_packet: DateTime, + stats: FlowStats, + context: FlowContext, +} + +#[derive(Debug, Default)] +struct FlowStats { + rx_packets: u64, + tx_packets: u64, + rx_bytes: u64, + tx_bytes: u64, +} + +impl FlowStats { + fn with_tx(mut self, payload_len: u64) -> Self { + self.inc_tx(payload_len); + + self + } + + fn inc_tx(&mut self, payload_len: u64) { + self.tx_packets += 1; + self.tx_bytes += payload_len; + } + + fn inc_rx(&mut self, payload_len: u64) { + self.rx_packets += 1; + self.rx_bytes += payload_len; + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +struct FlowContext { + src_ip: IpAddr, + dst_ip: IpAddr, + src_port: u16, + dst_port: u16, +} + +#[derive(PartialEq, Eq)] +struct FlowContextDiff { + src_ip: Option<(IpAddr, IpAddr)>, + dst_ip: Option<(IpAddr, IpAddr)>, + src_port: Option<(u16, u16)>, + dst_port: Option<(u16, u16)>, +} + +impl FlowContextDiff { + fn new(old: FlowContext, new: FlowContext) -> Self { + let src_ip_diff = (old.src_ip != new.src_ip).then_some((old.src_ip, new.src_ip)); + let dst_ip_diff = (old.dst_ip != new.dst_ip).then_some((old.dst_ip, new.dst_ip)); + let src_port_diff = (old.src_port != new.src_port).then_some((old.src_port, new.src_port)); + let dst_port_diff = (old.dst_port != new.dst_port).then_some((old.dst_port, new.dst_port)); + + Self { + src_ip: src_ip_diff, + dst_ip: dst_ip_diff, + src_port: src_port_diff, + dst_port: dst_port_diff, + } + } +} + +impl std::fmt::Debug for FlowContextDiff { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut debug_struct = f.debug_struct("FlowContextDiff"); + + if let Some((old, new)) = self.src_ip { + debug_struct + .field("old_src_ip", &old) + .field("new_src_ip", &new); + } + if let Some((old, new)) = self.dst_ip { + debug_struct + .field("old_dst_ip", &old) + .field("new_dst_ip", &new); + } + if let Some((old, new)) = self.src_port { + debug_struct + .field("old_src_port", &old) + .field("new_src_port", &new); + } + if let Some((old, new)) = self.dst_port { + debug_struct + .field("old_dst_port", &old) + .field("new_dst_port", &new); + } + + debug_struct.finish() + } +} + +pub mod inbound_wg { + use super::*; + + pub fn record_client(cid: ClientId) { + update_current_flow_inbound_wireguard(|wg| wg.client.replace(cid)); + } + + pub fn record_resource(rid: ResourceId) { + update_current_flow_inbound_wireguard(|wg| wg.resource.replace(rid)); + } + + pub fn record_decrypted_packet(packet: &IpPacket) { + update_current_flow_inbound_wireguard(|wg| { + wg.inner = Some(InnerFlow::from(packet)); + }); + } + + pub fn record_translated_packet(packet: &IpPacket) { + update_current_flow_inbound_wireguard(|wg| { + let Some(inner) = wg.inner.as_mut() else { + return; + }; + + inner.dst_ip = packet.destination(); + }); + } + + pub fn record_icmp_error(packet: &IpPacket) { + let Ok(Some((_, icmp_error))) = packet.icmp_error() else { + return; + }; + + update_current_flow_inbound_wireguard(|wg| wg.icmp_error.replace(icmp_error)); + } +} + +pub mod inbound_tun { + use super::*; + + pub fn record_client(cid: ClientId) { + update_current_flow_inbound_tun(|tun| tun.client.replace(cid)); + } + + pub fn record_resource(rid: ResourceId) { + update_current_flow_inbound_tun(|tun| tun.resource.replace(rid)); + } + + pub fn record_wireguard_packet(local: Option, remote: SocketAddr) { + update_current_flow_inbound_tun(|tun| tun.outer = Some(OuterFlow { local, remote })); + } +} + +fn update_current_flow_inbound_wireguard(f: impl FnOnce(&mut InboundWireGuard) -> R) { + CURRENT_FLOW.with_borrow_mut(|c| { + let Some(FlowData::InboundWireGuard(wg)) = c else { + return; + }; + + f(wg); + }); +} + +fn update_current_flow_inbound_tun(f: impl FnOnce(&mut InboundTun) -> R) { + CURRENT_FLOW.with_borrow_mut(|c| { + let Some(FlowData::InboundTun(tun)) = c else { + return; + }; + + f(tun); + }); +} + +pub struct CurrentFlowGuard<'a> { + inner: &'a mut FlowTracker, + created_at: Instant, +} + +impl<'a> Drop for CurrentFlowGuard<'a> { + fn drop(&mut self) { + let Some(current_flow) = CURRENT_FLOW.replace(None) else { + return; + }; + + match current_flow { + FlowData::InboundWireGuard(flow) => self + .inner + .insert_inbound_wireguard_flow(flow, self.created_at), + FlowData::InboundTun(flow) => self.inner.insert_inbound_tun_flow(flow, self.created_at), + } + } +} + +enum FlowData { + InboundWireGuard(InboundWireGuard), + InboundTun(InboundTun), +} + +#[derive(Debug)] +struct InboundWireGuard { + outer: OuterFlow, + inner: Option, + client: Option, + resource: Option, + icmp_error: Option, +} + +#[derive(Debug)] +struct InboundTun { + inner: InnerFlow, + outer: Option>>, + client: Option, + resource: Option, +} + +#[derive(Debug)] +struct OuterFlow { + local: L, + remote: SocketAddr, +} + +#[derive(Debug)] +struct InnerFlow { + src_ip: IpAddr, + dst_ip: IpAddr, + src_proto: Result, + dst_proto: Result, + + tcp_syn: bool, + tcp_fin: bool, + tcp_rst: bool, + + payload_len: usize, +} + +impl From<&IpPacket> for InnerFlow { + fn from(packet: &IpPacket) -> Self { + InnerFlow { + src_ip: packet.source(), + dst_ip: packet.destination(), + src_proto: packet.source_protocol(), + dst_proto: packet.destination_protocol(), + tcp_syn: packet.as_tcp().map(|tcp| tcp.syn()).unwrap_or(false), + tcp_fin: packet.as_tcp().map(|tcp| tcp.fin()).unwrap_or(false), + tcp_rst: packet.as_tcp().map(|tcp| tcp.rst()).unwrap_or(false), + payload_len: packet.layer4_payload_len(), + } + } +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use super::*; + + #[test] + fn flow_context_diff_rendering() { + let old = FlowContext { + src_ip: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), + dst_ip: IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)), + src_port: 8080, + dst_port: 443, + }; + let new = FlowContext { + src_ip: IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), + dst_ip: IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)), + src_port: 50000, + dst_port: 443, + }; + + let diff = FlowContextDiff::new(old, new); + + assert_eq!( + "FlowContextDiff { old_src_ip: 10.0.0.1, new_src_ip: 1.1.1.1, old_src_port: 8080, new_src_port: 50000 }", + format!("{diff:?}") + ); + } +} diff --git a/scripts/tests/download-concurrent.sh b/scripts/tests/download-concurrent.sh new file mode 100755 index 000000000..71967393b --- /dev/null +++ b/scripts/tests/download-concurrent.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +source "./scripts/tests/lib.sh" + +client sh -c "curl --fail --max-time 10 --output /tmp/download1.file http://download.httpbin/bytes?num=5000000" & +PID1=$! + +client sh -c "curl --fail --max-time 10 --output /tmp/download2.file http://download.httpbin/bytes?num=5000000" & +PID2=$! + +client sh -c "curl --fail --max-time 10 --output /tmp/download3.file http://download.httpbin/bytes?num=5000000" & +PID3=$! + +wait $PID1 || { + echo "Download 1 failed" + exit 1 +} + +wait $PID2 || { + echo "Download 2 failed" + exit 1 +} + +wait $PID3 || { + echo "Download 3 failed" + exit 1 +} + +sleep 3 +readarray -t flows < <(get_flow_logs "tcp") + +assert_eq "${#flows[@]}" 3 + +for flow in "${flows[@]}"; do + assert_eq "$(get_flow_field "$flow" "inner_dst_ip")" "172.21.0.101" + assert_gteq "$(get_flow_field "$flow" "rx_bytes")" 5000000 +done diff --git a/scripts/tests/download-roaming-network.sh b/scripts/tests/download-roaming-network.sh index e45de6beb..f7af12126 100755 --- a/scripts/tests/download-roaming-network.sh +++ b/scripts/tests/download-roaming-network.sh @@ -43,3 +43,24 @@ if [[ "$computed_checksum" != "$known_checksum" ]]; then echo "Checksum of downloaded file does not match" exit 1 fi + +sleep 3 +readarray -t flows < <(get_flow_logs "tcp") + +assert_gteq "${#flows[@]}" 2 + +# All flows should have same inner_dst_ip +for flow in "${flows[@]}"; do + assert_eq "$(get_flow_field "$flow" "inner_dst_ip")" "172.21.0.101" +done + +# Verify different outer_src_port after roaming (network change) +# The docker-compose setup uses routers and therefore the source IP is always the router. +# But conntrack on the router will allocate a new source port because the binding on the old one is still active after roaming. +original_src_port=$(get_flow_field "${flows[0]}" "outer_src_port") + +for ((i = 1; i < ${#flows[@]}; i++)); do + next_src_port=$(get_flow_field "${flows[i]}" "outer_src_port") + + assert_ne "$original_src_port" "$next_src_port" +done diff --git a/scripts/tests/download-rst.sh b/scripts/tests/download-rst.sh new file mode 100755 index 000000000..3b844a902 --- /dev/null +++ b/scripts/tests/download-rst.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +source "./scripts/tests/lib.sh" + +# 2 seconds are not enough at the given speed to download the file, curl will therefore abort and RST the connection. +client sh -c "curl --max-time 2 --limit-rate 1000000 --no-keepalive --parallel-max 1 --output /dev/null http://download.httpbin/bytes?num=100000000" & +DOWNLOAD_PID=$! + +wait $DOWNLOAD_PID || true # The download fails but we want to continue. + +sleep 3 +readarray -t flows < <(get_flow_logs "tcp") + +assert_gteq "${#flows[@]}" 1 + +rx_bytes=0 + +# All flows should have same inner_dst_ip +for flow in "${flows[@]}"; do + assert_eq "$(get_flow_field "$flow" "inner_dst_ip")" "172.21.0.101" + rx_bytes+="$(get_flow_field "$flow" "rx_bytes")" +done + +assert_gteq "$rx_bytes" 2000000 diff --git a/scripts/tests/download.sh b/scripts/tests/download.sh index 63acf8343..fc8cd9e05 100755 --- a/scripts/tests/download.sh +++ b/scripts/tests/download.sh @@ -18,3 +18,12 @@ if [[ "$computed_checksum" != "$known_checksum" ]]; then echo "Checksum of downloaded file does not match" exit 1 fi + +sleep 3 +readarray -t flows < <(get_flow_logs "tcp") + +assert_eq "${#flows[@]}" 1 + +flow="${flows[0]}" +assert_eq "$(get_flow_field "$flow" "inner_dst_ip")" "172.21.0.101" +assert_gteq "$(get_flow_field "$flow" "rx_bytes")" 10000000 diff --git a/scripts/tests/lib.sh b/scripts/tests/lib.sh index 370b43036..6e0c57385 100755 --- a/scripts/tests/lib.sh +++ b/scripts/tests/lib.sh @@ -50,7 +50,7 @@ Domain.PubSub.Account.broadcast(account_id, {{:reject_access, gateway_id}, clien " } -function assert_equals() { +function assert_eq() { local actual="$1" local expected="$2" @@ -60,6 +60,26 @@ function assert_equals() { fi } +function assert_ne() { + local actual="$1" + local expected="$2" + + if [[ "$expected" == "$actual" ]]; then + echo "Expected values to differ but both are $actual" + exit 1 + fi +} + +function assert_gteq() { + local actual="$1" + local expected="$2" + + if [ "$actual" -lt "$expected" ]; then + echo "Expected $actual to be greater than or equal to $expected" + exit 1 + fi +} + function process_state() { local container="$1" @@ -70,7 +90,7 @@ function assert_process_state { local container="$1" local expected_state="$2" - assert_equals "$(process_state "$container")" "$expected_state" + assert_eq "$(process_state "$container")" "$expected_state" } function create_token_file { @@ -96,3 +116,23 @@ function expect_error() { return 0 fi } + +# Extract flow logs from gateway for a given protocol +# Returns flow log lines (use with readarray) +# Usage: readarray -t flows < <(get_flow_logs "tcp") +function get_flow_logs() { + local protocol="$1" + + docker compose logs gateway --since 30s 2>/dev/null | + grep "flow_logs::${protocol}.*flow completed" || true +} + +# Extract a field value from a flow log line +# Usage: get_flow_field +# Example: get_flow_field "$flow" "inner_dst_ip" +function get_flow_field() { + local flow_log="$1" + local field_name="$2" + + echo "$flow_log" | grep -oP "${field_name}=\K[^ ]+" || echo "" +}