feat(gateway): emit resource name and address in flow logs (#10710)

To allow for better analysis of flow logs, we embed the resource name
and its address into the flow flogs. For the Internet Resource, the name
will be displayed as "Internet` and the address is either `0.0.0.0/0` or
`::/0` depending on the IP version of the packet. For CIDR resources,
the address is the subnet and for DNS resources, it is the domain
pattern.

Resolves: #10693
This commit is contained in:
Thomas Eizinger
2025-10-27 11:01:26 +11:00
committed by GitHub
parent 1e295742bc
commit 5eef5f90df
3 changed files with 92 additions and 19 deletions

View File

@@ -465,8 +465,10 @@ impl GatewayState {
tracing::trace!(
target: "flow_logs::tcp",
client = %flow.client,
resource = %flow.resource,
client_id = %flow.client,
resource_id = %flow.resource_id,
resource_name = %flow.resource_name,
resource_address = %flow.resource_address,
start = ?flow.start,
end = ?flow.end,
last_packet = ?flow.last_packet,
@@ -493,8 +495,10 @@ impl GatewayState {
tracing::trace!(
target: "flow_logs::udp",
client = %flow.client,
resource = %flow.resource,
client_id = %flow.client,
resource_id = %flow.resource_id,
resource_name = %flow.resource_name,
resource_address = %flow.resource_address,
start = ?flow.start,
end = ?flow.end,
last_packet = ?flow.last_packet,

View File

@@ -515,7 +515,16 @@ impl ClientOnGateway {
let rid = self.classify_resource(packet.destination(), packet.destination_protocol())?;
flow_tracker::inbound_wg::record_resource(rid);
let Some(resource) = self.resources.get(&rid) else {
tracing::warn!(%rid, "Internal state mismatch: No resource for ID");
return Ok(());
};
flow_tracker::inbound_wg::record_resource(
rid,
resource.name(),
resource.address(packet.destination()),
);
Ok(())
}
@@ -564,11 +573,13 @@ impl ClientOnGateway {
#[derive(Debug)]
enum ResourceOnGateway {
Cidr {
name: String,
network: IpNetwork,
filters: Filters,
expires_at: Option<DateTime<Utc>>,
},
Dns {
name: String,
address: String,
domains: BTreeMap<DomainName, BTreeSet<IpAddr>>,
filters: Filters,
@@ -583,12 +594,14 @@ impl ResourceOnGateway {
fn new(resource: ResourceDescription, expires_at: Option<DateTime<Utc>>) -> Self {
match resource {
ResourceDescription::Dns(r) => ResourceOnGateway::Dns {
name: r.name,
domains: BTreeMap::default(),
filters: r.filters,
address: r.address,
expires_at,
},
ResourceDescription::Cidr(r) => ResourceOnGateway::Cidr {
name: r.name,
network: r.address,
filters: r.filters,
expires_at,
@@ -668,6 +681,25 @@ impl ResourceOnGateway {
fn is_internet_resource(&self) -> bool {
matches!(self, ResourceOnGateway::Internet { .. })
}
fn name(&self) -> String {
match self {
ResourceOnGateway::Cidr { name, .. } => name.clone(),
ResourceOnGateway::Dns { name, .. } => name.clone(),
ResourceOnGateway::Internet { .. } => "Internet".to_owned(),
}
}
fn address(&self, dst: IpAddr) -> String {
match self {
ResourceOnGateway::Cidr { network, .. } => network.to_string(),
ResourceOnGateway::Dns { address, .. } => address.clone(),
ResourceOnGateway::Internet { .. } => match dst {
IpAddr::V4(_) => "0.0.0.0/0".to_owned(),
IpAddr::V6(_) => "::/0".to_owned(),
},
}
}
}
// Current state of a translation for a given proxy ip

View File

@@ -183,7 +183,7 @@ impl FlowTracker {
(Protocol::Tcp(src_port), Protocol::Tcp(dst_port)) => {
let key = TcpFlowKey {
client,
resource,
resource: resource.id,
src_ip,
dst_ip,
src_port,
@@ -207,6 +207,8 @@ impl FlowTracker {
fin_tx: false,
fin_rx: false,
domain,
resource_name: resource.name,
resource_address: resource.address,
});
}
hash_map::Entry::Occupied(occupied) if occupied.get().context != context => {
@@ -233,6 +235,8 @@ impl FlowTracker {
fin_tx: false,
fin_rx: false,
domain,
resource_name: resource.name,
resource_address: resource.address,
},
);
}
@@ -255,6 +259,8 @@ impl FlowTracker {
fin_tx: false,
fin_rx: false,
domain,
resource_name: resource.name,
resource_address: resource.address,
},
);
}
@@ -281,7 +287,7 @@ impl FlowTracker {
(Protocol::Udp(src_port), Protocol::Udp(dst_port)) => {
let key = UdpFlowKey {
client,
resource,
resource: resource.id,
src_ip,
dst_ip,
src_port,
@@ -298,13 +304,15 @@ impl FlowTracker {
stats: FlowStats::default().with_tx(payload_len as u64),
context,
domain,
resource_name: resource.name,
resource_address: resource.address,
});
}
hash_map::Entry::Occupied(occupied) if occupied.get().context != context => {
let (key, value) = occupied.remove_entry();
let context_diff = FlowContextDiff::new(value.context, context);
let flow = CompletedUdpFlow::new(key, value, now_utc);
let flow = CompletedUdpFlow::new(key, value.clone(), now_utc);
tracing::debug!(
?key,
@@ -322,6 +330,8 @@ impl FlowTracker {
stats: FlowStats::default().with_tx(payload_len as u64),
context,
domain,
resource_name: value.resource_name,
resource_address: value.resource_address,
},
);
}
@@ -448,7 +458,11 @@ pub enum CompletedFlow {
#[derive(Debug)]
pub struct CompletedTcpFlow {
pub client: ClientId,
pub resource: ResourceId,
pub resource_id: ResourceId,
pub resource_name: String,
pub resource_address: String,
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
pub last_packet: DateTime<Utc>,
@@ -473,7 +487,11 @@ pub struct CompletedTcpFlow {
#[derive(Debug)]
pub struct CompletedUdpFlow {
pub client: ClientId,
pub resource: ResourceId,
pub resource_id: ResourceId,
pub resource_name: String,
pub resource_address: String,
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
pub last_packet: DateTime<Utc>,
@@ -499,7 +517,9 @@ impl CompletedTcpFlow {
fn new(key: TcpFlowKey, value: TcpFlowValue, end: DateTime<Utc>) -> Self {
Self {
client: key.client,
resource: key.resource,
resource_id: key.resource,
resource_name: value.resource_name,
resource_address: value.resource_address,
start: value.start,
end,
last_packet: value.last_packet,
@@ -524,7 +544,9 @@ impl CompletedUdpFlow {
fn new(key: UdpFlowKey, value: UdpFlowValue, end: DateTime<Utc>) -> Self {
Self {
client: key.client,
resource: key.resource,
resource_id: key.resource,
resource_name: value.resource_name,
resource_address: value.resource_address,
start: value.start,
end,
last_packet: value.last_packet,
@@ -574,11 +596,14 @@ struct TcpFlowValue {
domain: Option<DomainName>,
resource_name: String,
resource_address: String,
fin_tx: bool,
fin_rx: bool,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct UdpFlowValue {
start: DateTime<Utc>,
last_packet: DateTime<Utc>,
@@ -586,9 +611,12 @@ struct UdpFlowValue {
context: FlowContext,
domain: Option<DomainName>,
resource_name: String,
resource_address: String,
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, Copy)]
struct FlowStats {
rx_packets: u64,
tx_packets: u64,
@@ -684,8 +712,10 @@ pub mod inbound_wg {
update_current_flow_inbound_wireguard(|wg| wg.client.replace(cid));
}
pub fn record_resource(rid: ResourceId) {
update_current_flow_inbound_wireguard(|wg| wg.resource.replace(rid));
pub fn record_resource(id: ResourceId, name: String, address: String) {
update_current_flow_inbound_wireguard(|wg| {
wg.resource.replace(Resource { id, name, address })
});
}
pub fn record_domain(name: DomainName) {
@@ -724,8 +754,8 @@ pub mod inbound_tun {
update_current_flow_inbound_tun(|tun| tun.client.replace(cid));
}
pub fn record_resource(rid: ResourceId) {
update_current_flow_inbound_tun(|tun| tun.resource.replace(rid));
pub fn record_resource(id: ResourceId) {
update_current_flow_inbound_tun(|wg| wg.resource.replace(id));
}
pub fn record_wireguard_packet(local: Option<SocketAddr>, remote: SocketAddr) {
@@ -783,7 +813,7 @@ struct InboundWireGuard {
outer: OuterFlow<SocketAddr>,
inner: Option<InnerFlow>,
client: Option<ClientId>,
resource: Option<ResourceId>,
resource: Option<Resource>,
/// The domain name in case this packet is for a DNS resource.
domain: Option<DomainName>,
icmp_error: Option<IcmpError>,
@@ -817,6 +847,13 @@ struct InnerFlow {
payload_len: usize,
}
#[derive(Debug)]
struct Resource {
id: ResourceId,
name: String,
address: String,
}
impl From<&IpPacket> for InnerFlow {
fn from(packet: &IpPacket) -> Self {
InnerFlow {