Gabi/fix relay expected message size (#1911)

This PR should fix the way we handle the `length` field in the
`DataChannel` messages, previous to this fix relaying data (using the
`webrtc-rs` crate) was impossible)

The new way to handle this is if the actual message is bigger than what
this data field says we ignore the extra bytes (which I think is the
correct way to do it according to spec)

Also, I added an integration test to verify relay messages using
`iptables`, not the cleanest way to do it but the easiest, in this vein
I tried to fix the caching for rust containers since 2 integration test
in our current state would take ~20 minutes each.
This commit is contained in:
Gabi
2023-08-16 15:29:51 -05:00
committed by GitHub
parent f59ed16dca
commit 577ce43942
4 changed files with 91 additions and 28 deletions

View File

@@ -23,6 +23,12 @@ jobs:
api.cache-to=scope=api,type=gha,mode=max
web.cache-from=scope=web,type=gha
web.cache-to=scope=web,type=gha,mode=max
client.cache-from=scope=rust,type=gha
client.cache-to=scope=rust,type=gha,mode=max
gateway.cache-from=scope=rust,type=gha
gateway.cache-to=scope=rust,type=gha,mode=max
relay.cache-from=scope=rust,type=gha
relay.cache-to=scope=rust,type=gha,mode=max
files: docker-compose.yml
push: false
- name: Seed database
@@ -30,12 +36,42 @@ jobs:
- name: Start docker compose in the background
run: docker compose up -d
- name: Test that client can ping resource
# FIXME: When the client sends the first packet trying to connect but there's no relay
# the portal responds with that and we don't try to continue the flow until we receive a success
# response with all the relays, thus we just sleep here waiting for the relay to have its presence tracked
# by the portal.
# The fix next will be:
# * If the relay list comes back as an error, retry a few times
# * If the it still comes as an error try a direct connection(local network)
# Right now this is working because we wait for the relay to be up and running before starting the client
run: docker compose exec -it client ping 172.20.0.100 -c 20
integration-test_relayed-flow:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Build images
uses: docker/bake-action@v3.1.0
with:
set: |
elixir.cache-from=scope=elixir,type=gha
elixir.cache-to=scope=elixir,type=gha,mode=max
api.cache-from=scope=api,type=gha
api.cache-to=scope=api,type=gha,mode=max
web.cache-from=scope=web,type=gha
web.cache-to=scope=web,type=gha,mode=max
client.cache-from=scope=rust,type=gha
client.cache-to=scope=rust,type=gha,mode=max
gateway.cache-from=scope=rust,type=gha
gateway.cache-to=scope=rust,type=gha,mode=max
relay.cache-from=scope=rust,type=gha
relay.cache-to=scope=rust,type=gha,mode=max
files: docker-compose.yml
push: false
- name: Seed database
run: docker compose run elixir /bin/sh -c "cd apps/domain && mix ecto.seed"
- name: Start docker compose in the background
run: docker compose up -d
# This rule forces to use the relay between client and gateway.
#
- name: Disallow traffic between gateway and client container
run: |
sudo iptables -I FORWARD 1 -s 172.28.0.100 -d 172.28.0.105 -j DROP
sudo iptables -I FORWARD 1 -s 172.28.0.105 -d 172.28.0.100 -j DROP
- name: Test that client can ping resource
run: docker compose exec -it client ping 172.20.0.100 -c 20

View File

@@ -166,8 +166,9 @@ services:
api:
condition: 'service_healthy'
networks:
- app
- resources
app:
ipv4_address: 172.28.0.105
resources:
resource:
image: alpine:3.18
@@ -204,7 +205,6 @@ services:
networks:
app:
ipv4_address: 172.28.0.101
ipv6_address: 2001:3990:3990::101
api:
build:
@@ -347,11 +347,10 @@ networks:
config:
- subnet: 172.20.0.0/16
app:
enable_ipv6: true
enable_ipv6: false
ipam:
config:
- subnet: 172.28.0.0/16
- subnet: 2001:3990:3990::/64
volumes:
postgres-data:

View File

@@ -1,5 +1,6 @@
use crate::Binding;
use proptest::arbitrary::any;
use proptest::collection::vec;
use proptest::strategy::Just;
use proptest::strategy::Strategy;
use proptest::string::string_regex;
@@ -29,6 +30,13 @@ pub fn channel_number() -> impl Strategy<Value = ChannelNumber> {
(ChannelNumber::MIN..ChannelNumber::MAX).prop_map(|n| ChannelNumber::new(n).unwrap())
}
pub fn channel_payload() -> impl Strategy<Value = (Vec<u8>, usize)> {
vec(any::<u8>(), 0..(u16::MAX as usize)).prop_flat_map(|vec| {
let len = vec.len();
(Just(vec), 0..len)
})
}
pub fn username_salt() -> impl Strategy<Value = String> {
string_regex("[a-zA-Z0-9]{10}").unwrap()
}

View File

@@ -1,6 +1,8 @@
use bytes::{BufMut, BytesMut};
use std::io;
const HEADER_LEN: usize = 4;
#[derive(Debug, PartialEq)]
pub struct ChannelData<'a> {
channel: u16,
@@ -9,14 +11,16 @@ pub struct ChannelData<'a> {
impl<'a> ChannelData<'a> {
pub fn parse(data: &'a [u8]) -> Result<Self, io::Error> {
if data.len() < 4 {
if data.len() < HEADER_LEN {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"channel data messages are at least 4 bytes long",
));
}
let channel_number = u16::from_be_bytes([data[0], data[1]]);
let (header, payload) = data.split_at(HEADER_LEN);
let channel_number = u16::from_be_bytes([header[0], header[1]]);
if !(0x4000..=0x7FFF).contains(&channel_number) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
@@ -24,39 +28,33 @@ impl<'a> ChannelData<'a> {
));
}
let length = u16::from_be_bytes([data[2], data[3]]);
let length = u16::from_be_bytes([header[2], header[3]]) as usize;
let actual_payload_length = data.len() - 4;
if actual_payload_length != length as usize {
if payload.len() < length {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"channel data message specified {length} bytes but got {actual_payload_length}"
"channel data message specified {length} bytes but the payload is only {} bytes", payload.len()
),
));
}
Ok(ChannelData {
channel: channel_number,
data: &data[4..],
data: &payload[..length],
})
}
pub fn new(channel: u16, data: &'a [u8]) -> Self {
debug_assert!(channel > 0x400);
debug_assert!(channel < 0x7FFF);
debug_assert!(data.len() <= u16::MAX as usize);
ChannelData { channel, data }
}
// Panics if self.data.len() > u16::MAX
pub fn to_bytes(&self) -> Vec<u8> {
let mut message = BytesMut::with_capacity(2 + 2 + self.data.len());
message.put_slice(&self.channel.to_be_bytes());
message.put_u16(self.data.len() as u16);
message.put_slice(self.data);
message.freeze().into()
to_bytes(self.channel, self.data.len() as u16, self.data)
}
pub fn channel(&self) -> u16 {
@@ -68,6 +66,16 @@ impl<'a> ChannelData<'a> {
}
}
fn to_bytes(channel: u16, len: u16, payload: &[u8]) -> Vec<u8> {
let mut message = BytesMut::with_capacity(HEADER_LEN + (len as usize));
message.put_u16(channel);
message.put_u16(len);
message.put_slice(payload);
message.freeze().into()
}
#[cfg(all(test, feature = "proptest"))]
mod tests {
use super::*;
@@ -85,4 +93,16 @@ mod tests {
assert_eq!(channel_data, parsed)
}
#[test_strategy::proptest]
fn channel_data_decoding(
#[strategy(crate::proptest::channel_number())] channel: ChannelNumber,
#[strategy(crate::proptest::channel_payload())] payload: (Vec<u8>, usize),
) {
let encoded = to_bytes(channel.value(), payload.1 as u16, &payload.0);
let parsed = ChannelData::parse(&encoded).unwrap();
assert_eq!(channel.value(), parsed.channel);
assert_eq!(&payload.0[..payload.1], parsed.data)
}
}