relay: refresh allocations (#1610)

This commit is contained in:
Thomas Eizinger
2023-05-15 23:21:02 +02:00
committed by GitHub
parent 01f33ed4a0
commit 675cb2dd54
5 changed files with 334 additions and 28 deletions

140
rust/Cargo.lock generated
View File

@@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "aho-corasick"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04"
dependencies = [
"memchr",
]
[[package]]
name = "anyhow"
version = "1.0.71"
@@ -14,6 +23,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bytecodec"
version = "0.4.15"
@@ -30,6 +45,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "cc"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
[[package]]
name = "cfg-if"
version = "1.0.0"
@@ -51,6 +72,40 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "env_logger"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0"
dependencies = [
"humantime",
"is-terminal",
"log",
"regex",
"termcolor",
]
[[package]]
name = "errno"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [
"errno-dragonfly",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "futures"
version = "0.3.28"
@@ -160,12 +215,24 @@ dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hex-literal"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46"
[[package]]
name = "hmac-sha1"
version = "0.1.3"
@@ -175,6 +242,35 @@ dependencies = [
"sha1",
]
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "io-lifetimes"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220"
dependencies = [
"hermit-abi 0.3.1",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "is-terminal"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys 0.48.0",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@@ -187,6 +283,12 @@ version = "0.2.144"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
[[package]]
name = "linux-raw-sys"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f"
[[package]]
name = "log"
version = "0.4.17"
@@ -245,7 +347,7 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi",
"hermit-abi 0.2.6",
"libc",
]
@@ -333,6 +435,8 @@ version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.1",
]
@@ -363,8 +467,10 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bytecodec",
"env_logger",
"futures",
"hex",
"hex-literal",
"rand",
"stun_codec",
"tokio",
@@ -372,6 +478,20 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "rustix"
version = "0.37.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acf8729d8542766f1b2cf77eb034d52f40d375bb8b615d0b147089946e16613d"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys 0.48.0",
]
[[package]]
name = "sha1"
version = "0.2.0"
@@ -448,6 +568,15 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "termcolor"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [
"winapi-util",
]
[[package]]
name = "thread_local"
version = "1.1.7"
@@ -611,6 +740,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View File

@@ -8,8 +8,10 @@ anyhow = "1.0.71"
bytecodec = "0.4.15"
futures = "0.3.28"
hex = "0.4.3"
hex-literal = "0.4.1"
rand = "0.8.5"
stun_codec = "0.3.1"
tokio = { version = "1.28.0", features = ["macros", "rt-multi-thread", "net", "time"] }
tracing = { version = "0.1.37", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
env_logger = "0.10.0"

View File

@@ -14,12 +14,12 @@ use stun_codec::rfc5389::errors::BadRequest;
use stun_codec::rfc5389::methods::BINDING;
use stun_codec::rfc5766::attributes::{Lifetime, RequestedTransport, XorRelayAddress};
use stun_codec::rfc5766::errors::{AllocationMismatch, InsufficientCapacity};
use stun_codec::rfc5766::methods::ALLOCATE;
use stun_codec::{Message, MessageClass, MessageDecoder, MessageEncoder, TransactionId};
use stun_codec::rfc5766::methods::{ALLOCATE, REFRESH};
use stun_codec::{Message, MessageClass, MessageDecoder, MessageEncoder, Method, TransactionId};
/// A sans-IO STUN & TURN server.
///
/// A [`Server`] is bound to pair of IPv4 and IPv6 addresses and assumes to only operate on UDP.
/// A [`Server`] is bound to an IPv4 address and assumes to only operate on UDP.
/// Thus, 3 out of the 5 components of a "5-tuple" are unique to an instance of [`Server`] and
/// we can index data simply by the sender's [`SocketAddr`].
///
@@ -82,7 +82,7 @@ impl fmt::Display for AllocationId {
}
}
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-requested-transport>
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-requested-transport>.
const UDP_TRANSPORT: u8 = 17;
/// The maximum number of ports available for allocation.
@@ -93,7 +93,7 @@ const MAX_ALLOCATION_LIFETIME: Duration = Duration::from_secs(3600);
/// The default lifetime of an allocation.
///
/// TODO: This has been chosen at random by Thomas. Revisit if it makes sense.
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-allocations-2>.
const DEFAULT_ALLOCATION_LIFETIME: Duration = Duration::from_secs(600);
impl Server {
@@ -188,7 +188,19 @@ where
let transaction_id = message.transaction_id();
if let Err(e) = self.handle_allocate_request(message, sender, now) {
self.send_message(allocate_error_response(transaction_id, e), sender);
self.send_message(error_response(transaction_id, ALLOCATE, e), sender);
}
return;
}
if message.class() == MessageClass::Request && message.method() == REFRESH {
tracing::debug!("Received TURN refresh request from: {sender}");
let transaction_id = message.transaction_id();
if let Err(e) = self.handle_refresh_request(message, sender, now) {
self.send_message(error_response(transaction_id, ALLOCATE, e), sender);
}
return;
@@ -259,15 +271,10 @@ where
message.add_attribute(XorMappedAddress::new(sender).into());
message.add_attribute(effective_lifetime.clone().into());
self.time_events.add(
let wake_deadline = self.time_events.add(
allocation.expires_at,
TimedAction::ExpireAllocation(allocation.id),
);
let wake_deadline = self
.time_events
.next_trigger()
.expect("we just pushed a time event");
self.pending_commands.push_back(Command::Wake {
deadline: wake_deadline,
});
@@ -289,6 +296,64 @@ where
Ok(())
}
/// Handle a TURN refresh request.
///
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-receiving-a-refresh-request> for details.
fn handle_refresh_request(
&mut self,
message: Message<Attribute>,
sender: SocketAddr,
now: Instant,
) -> Result<(), ErrorCode> {
// TODO: Verify that this is the correct error code.
let allocation = self
.allocations
.get_mut(&sender)
.ok_or(ErrorCode::from(AllocationMismatch))?;
let requested_lifetime = message.get_attribute::<Lifetime>();
let effective_lifetime = compute_effective_lifetime(requested_lifetime);
if effective_lifetime.lifetime().is_zero() {
let port = allocation.port;
self.pending_commands
.push_back(Command::FreeAddresses { id: allocation.id });
self.allocations.remove(&sender);
self.used_ports.remove(&port);
self.send_message(
refresh_success_response(effective_lifetime, message.transaction_id()),
sender,
);
tracing::info!("Deleted allocation for {sender} on port {port}");
return Ok(());
}
allocation.expires_at = now + effective_lifetime.lifetime();
tracing::info!(
"Refreshed allocation for {sender} on port {} and lifetime {}s",
allocation.port,
effective_lifetime.lifetime().as_secs()
);
let wake_deadline = self.time_events.add(
allocation.expires_at,
TimedAction::ExpireAllocation(allocation.id),
);
self.pending_commands.push_back(Command::Wake {
deadline: wake_deadline,
});
self.send_message(
refresh_success_response(effective_lifetime, message.transaction_id()),
sender,
);
Ok(())
}
fn create_new_allocation(&mut self, now: Instant, lifetime: &Lifetime) -> Allocation {
// First, find an unused port.
@@ -351,6 +416,15 @@ where
}
}
fn refresh_success_response(
effective_lifetime: Lifetime,
transaction_id: TransactionId,
) -> Message<Attribute> {
let mut message = Message::new(MessageClass::SuccessResponse, REFRESH, transaction_id);
message.add_attribute(effective_lifetime.into());
message
}
impl Server<StepRng> {
#[allow(dead_code)]
pub fn test() -> Self {
@@ -400,11 +474,12 @@ fn compute_effective_lifetime(requested_lifetime: Option<&Lifetime>) -> Lifetime
Lifetime::new(effective_lifetime).unwrap()
}
fn allocate_error_response(
fn error_response(
transaction_id: TransactionId,
method: Method,
error_code: ErrorCode,
) -> Message<Attribute> {
let mut message = Message::new(MessageClass::ErrorResponse, ALLOCATE, transaction_id);
let mut message = Message::new(MessageClass::ErrorResponse, method, transaction_id);
message.add_attribute(error_code.into());
message

View File

@@ -13,12 +13,16 @@ pub struct TimeEvents<A> {
impl<A> TimeEvents<A> {
/// Add an action to be executed at the specified time.
pub fn add(&mut self, trigger: Instant, action: A) {
///
/// Returns the new wake deadline for convenience.
pub fn add(&mut self, trigger: Instant, action: A) -> Instant {
self.events.push(TimeEvent {
time: trigger,
action,
});
self.events.sort_unstable();
self.next_trigger().expect("just pushed an event")
}
/// Remove and return all actions that are pending, given that time has advanced to `now`.

View File

@@ -1,19 +1,24 @@
use bytecodec::EncodeExt;
use hex_literal::hex;
use relay::{AllocationId, Command, Server};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use stun_codec::rfc5766::attributes::Lifetime;
use stun_codec::rfc5766::methods::REFRESH;
use stun_codec::{Attribute, Message, MessageClass, MessageEncoder, TransactionId};
#[test]
fn stun_binding_request() {
run_regression_test(&[(
Input::Client(
Input::client(
"91.141.64.64:26098",
"000100002112a4420908af7d45e8751f5092d167",
Instant::now(),
),
&[Output::SendMessage((
&[Output::send_message(
"91.141.64.64:26098",
"0101000c2112a4420908af7d45e8751f5092d16700200008000144e07a9fe402",
))],
)],
)]);
}
@@ -22,11 +27,11 @@ fn turn_allocation_request() {
let now = Instant::now();
run_regression_test(&[(
Input::Client("91.141.70.157:7112", "000300182112a44215d4bb014ad31072cd248ec70019000411000000000d000400000e1080280004d08a7674", now),
Input::client("91.141.70.157:7112", "000300182112a44215d4bb014ad31072cd248ec70019000411000000000d000400000e1080280004d08a7674", now),
&[
Output::Wake(now + Duration::from_secs(3600)),
Output::CreateAllocation(49152),
Output::SendMessage(("91.141.70.157:7112", "010300202112a44215d4bb014ad31072cd248ec7001600080001e112026eff670020000800013ada7a9fe2df000d000400000e10")),
Output::send_message("91.141.70.157:7112", "010300202112a44215d4bb014ad31072cd248ec7001600080001e112026eff670020000800013ada7a9fe2df000d000400000e10"),
],
)]);
}
@@ -36,11 +41,11 @@ fn deallocate_once_time_expired() {
let now = Instant::now();
run_regression_test(&[(
Input::Client("91.141.70.157:7112", "000300182112a44215d4bb014ad31072cd248ec70019000411000000000d000400000e1080280004d08a7674", now),
Input::client("91.141.70.157:7112", "000300182112a44215d4bb014ad31072cd248ec70019000411000000000d000400000e1080280004d08a7674", now),
&[
Output::Wake(now + Duration::from_secs(3600)),
Output::CreateAllocation(49152),
Output::SendMessage(("91.141.70.157:7112", "010300202112a44215d4bb014ad31072cd248ec7001600080001e112026eff670020000800013ada7a9fe2df000d000400000e10")),
Output::send_message("91.141.70.157:7112", "010300202112a44215d4bb014ad31072cd248ec7001600080001e112026eff670020000800013ada7a9fe2df000d000400000e10"),
],
), (
Input::Time(now + Duration::from_secs(3601)),
@@ -50,11 +55,63 @@ fn deallocate_once_time_expired() {
)]);
}
#[test]
fn when_refreshed_in_time_allocation_does_not_expire() {
let now = Instant::now();
let refreshed_at = now + Duration::from_secs(1800);
let first_expiry = now + Duration::from_secs(3600);
run_regression_test(&[(
Input::client("91.141.70.157:7112", "000300182112a44215d4bb014ad31072cd248ec70019000411000000000d000400000e1080280004d08a7674", now),
&[
Output::Wake(first_expiry),
Output::CreateAllocation(49152),
Output::send_message("91.141.70.157:7112", "010300202112a44215d4bb014ad31072cd248ec7001600080001e112026eff670020000800013ada7a9fe2df000d000400000e10"),
],
),(
Input::client("91.141.70.157:7112", refresh_request(hex!("150ee0cb117ed3a0f66529f2"), 3600), refreshed_at),
&[
Output::Wake(first_expiry), // `first_expiry` would still happen after the refresh but it will be a no-op wake-up.
Output::send_message("91.141.70.157:7112", "010400082112a442150ee0cb117ed3a0f66529f2000d000400000e10"),
],
),(
Input::Time(first_expiry + Duration::from_secs(1)),
&[],
)]);
}
#[test]
fn when_receiving_lifetime_0_for_existing_allocation_then_delete() {
let now = Instant::now();
let refreshed_at = now + Duration::from_secs(1800);
let first_expiry = now + Duration::from_secs(3600);
run_regression_test(&[(
Input::client("91.141.70.157:7112", "000300182112a44215d4bb014ad31072cd248ec70019000411000000000d000400000e1080280004d08a7674", now),
&[
Output::Wake(first_expiry),
Output::CreateAllocation(49152),
Output::send_message("91.141.70.157:7112", "010300202112a44215d4bb014ad31072cd248ec7001600080001e112026eff670020000800013ada7a9fe2df000d000400000e10"),
],
),(
Input::client("91.141.70.157:7112", refresh_request(hex!("150ee0cb117ed3a0f66529f2"), 0), refreshed_at),
&[
Output::ExpireAllocation(49152),
Output::send_message("91.141.70.157:7112", "010400082112a442150ee0cb117ed3a0f66529f2000d000400000000"),
],
),(
Input::Time(first_expiry + Duration::from_secs(1)),
&[],
)]);
}
/// Run a regression test with a sequence events where we always have 1 input and N outputs.
fn run_regression_test(sequence: &[(Input, &[Output])]) {
let _ = env_logger::try_init();
let mut server = Server::test();
let mut allocatio_mapping = HashMap::<u16, AllocationId>::default();
let mut allocation_mapping = HashMap::<u16, AllocationId>::default();
for (input, output) in sequence {
match input {
@@ -85,17 +142,17 @@ fn run_regression_test(sequence: &[(Input, &[Output])]) {
) => {
assert_eq!(port, *expected_port);
allocatio_mapping.insert(*expected_port, id);
allocation_mapping.insert(*expected_port, id);
}
(Output::Wake(expected), Command::Wake { deadline }) => {
assert_eq!(*expected, deadline);
}
(Output::ExpireAllocation(port), Command::FreeAddresses { id }) => {
let expected_id = allocatio_mapping.remove(port).expect("unknown allocation");
let expected_id = allocation_mapping.remove(port).expect("unknown allocation");
assert_eq!(expected_id, id);
}
(expected, actual) => panic!("Unhandled events: {expected:?} and {actual:?}"),
(expected, actual) => panic!("Expected: {expected:?}\nActual: {actual:?}\n"),
}
}
@@ -103,11 +160,35 @@ fn run_regression_test(sequence: &[(Input, &[Output])]) {
}
}
fn refresh_request(transaction_id: [u8; 12], lifetime: u32) -> String {
let mut message = Message::new(
MessageClass::Request,
REFRESH,
TransactionId::new(transaction_id),
);
message.add_attribute(Lifetime::from_u32(lifetime));
message_to_hex(message)
}
fn message_to_hex<A>(message: Message<A>) -> String
where
A: Attribute,
{
hex::encode(MessageEncoder::new().encode_into_bytes(message).unwrap())
}
enum Input {
Client(Ip, Bytes, Instant),
Time(Instant),
}
impl Input {
fn client(from: Ip, data: impl AsRef<str>, now: Instant) -> Self {
Self::Client(from, data.as_ref().to_owned(), now)
}
}
#[derive(Debug)]
enum Output {
SendMessage((Ip, Bytes)),
@@ -116,5 +197,11 @@ enum Output {
ExpireAllocation(u16),
}
impl Output {
fn send_message(from: Ip, data: impl AsRef<str>) -> Self {
Self::SendMessage((from, data.as_ref().to_owned()))
}
}
type Ip = &'static str;
type Bytes = &'static str;
type Bytes = String;