connlib: Connection mock (#1721)

Resolves firezone/product#607

Setting the env var `CONNLIB_MOCK` when building through either
`build-rust.sh` or `gradle` will activate the `mock` feature.
This commit is contained in:
Francesca Lovebloom
2023-07-03 11:48:55 -07:00
committed by GitHub
parent 9deae3653a
commit d0a8333976
8 changed files with 123 additions and 60 deletions

View File

@@ -10,6 +10,8 @@ anything released here until this notice is removed. You have been warned.
## Building Connlib
Setting the `CONNLIB_MOCK` environment variable when packaging for Apple or Android will activate the `mock` feature flag, replacing connlib's normal connection logic with a mock for testing purposes.
1. You'll need a Rust toolchain installed if you don't have one already. We
recommend following the instructions at https://rustup.rs.
1. `rustup show` will install all needed targets since they are added to `rust-toolchain.toml`.

View File

@@ -3,6 +3,9 @@ name = "connlib-android"
version = "0.1.6"
edition = "2021"
[features]
mock = ["firezone-client-connlib/mock"]
[dependencies]
jni = { version = "0.21.1", features = ["invocation"] }
firezone-client-connlib = { path = "../../libs/client" }

View File

@@ -85,6 +85,11 @@ cargo {
module = "../"
libname = "connlib"
targets = listOf("arm", "arm64", "x86", "x86_64")
features {
if (System.getenv("CONNLIB_MOCK") != null) {
defaultAnd(listOf("mock").toTypedArray())
}
}
}
tasks.whenTaskAdded {

View File

@@ -3,6 +3,9 @@ name = "connlib-apple"
version = "0.1.6"
edition = "2021"
[features]
mock = ["firezone-client-connlib/mock"]
[build-dependencies]
anyhow = "1.0.71"
diva = "0.1.0"

View File

@@ -45,16 +45,20 @@ else
fi
fi
if [[ -n "$CONNLIB_MOCK" ]]; then
LIPO_ARGS="--features mock"
fi
# if [ $ENABLE_PREVIEWS == "NO" ]; then
if [[ $CONFIGURATION == "Release" ]]; then
echo "BUILDING FOR RELEASE ($TARGETS)"
cargo lipo --release --manifest-path ./Cargo.toml --targets $TARGETS
cargo lipo --release --manifest-path ./Cargo.toml --targets $TARGETS $LIPO_ARGS
else
echo "BUILDING FOR DEBUG ($TARGETS)"
cargo lipo --manifest-path ./Cargo.toml --targets $TARGETS
cargo lipo --manifest-path ./Cargo.toml --targets $TARGETS $LIPO_ARGS
fi
# else

View File

@@ -3,6 +3,9 @@ name = "firezone-client-connlib"
version = "0.1.0"
edition = "2021"
[features]
mock = ["libs-common/mock"]
[dependencies]
tokio = { version = "1.27", default-features = false, features = ["sync"] }
tracing = { version = "0.1", default-features = false, features = ["std", "attributes"] }

View File

@@ -5,6 +5,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
mock = []
jni-bindings = ["boringtun/jni-bindings"]
[dependencies]

View File

@@ -6,14 +6,21 @@ use rand_core::OsRng;
use std::{
marker::PhantomData,
net::{Ipv4Addr, Ipv6Addr},
time::Duration,
};
use tokio::{
runtime::Runtime,
sync::mpsc::{Receiver, Sender},
};
use url::Url;
use uuid::Uuid;
use crate::{control::PhoenixChannel, error_type::ErrorType, messages::Key, Error, Result};
use crate::{
control::PhoenixChannel,
error_type::ErrorType,
messages::{Key, ResourceDescription, ResourceDescriptionCidr},
Error, Result,
};
// TODO: Not the most tidy trait for a control-plane.
/// Trait that represents a control-plane.
@@ -124,63 +131,11 @@ where
.enable_all()
.build()?;
runtime.spawn(async move {
let private_key = StaticSecret::random_from_rng(OsRng);
let self_id = uuid::Uuid::new_v4();
let name_suffix: String = thread_rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect();
let connect_url = fatal_error!(get_websocket_path(portal_url, token, T::socket_path(), &Key(PublicKey::from(&private_key).to_bytes()), &self_id.to_string(), &name_suffix), callbacks);
let (sender, mut receiver) = fatal_error!(T::start(private_key, callbacks.clone()).await, callbacks);
let mut connection = PhoenixChannel::<_, U, R, M>::new(connect_url, move |msg| {
let sender = sender.clone();
async move {
tracing::trace!("Received message: {msg:?}");
if let Err(e) = sender.send(msg).await {
tracing::warn!("Received a message after handler already closed: {e}. Probably message received during session clean up.");
}
}
});
// Used to send internal messages
let mut internal_sender = connection.sender();
let topic = T::socket_path().to_string();
let topic_send = topic.clone();
tokio::spawn(async move {
let mut exponential_backoff = ExponentialBackoffBuilder::default().build();
loop {
let result = connection.start(vec![topic.clone()]).await;
if let Some(t) = exponential_backoff.next_backoff() {
tracing::warn!("Error during connection to the portal, retrying in {} seconds", t.as_secs());
match result {
Ok(()) => callbacks.on_error(&tokio_tungstenite::tungstenite::Error::ConnectionClosed.into(), ErrorType::Recoverable),
Err(e) => callbacks.on_error(&e, ErrorType::Recoverable)
}
tokio::time::sleep(t).await;
} else {
tracing::error!("Connection to the portal error, check your internet or the status of the portal.\nDisconnecting interface.");
match result {
Ok(()) => callbacks.on_error(&crate::Error::PortalConnectionError(tokio_tungstenite::tungstenite::Error::ConnectionClosed), ErrorType::Fatal),
Err(e) => callbacks.on_error(&e, ErrorType::Fatal)
}
break;
}
}
});
// TODO: Implement Sink for PhoenixEvent (created from a PhoenixSender event + topic)
// that way we can simply do receiver.forward(sender)
tokio::spawn(async move {
while let Some(message) = receiver.recv().await {
if let Err(err) = internal_sender.send(&topic_send, message).await {
tracing::error!("Channel already closed when trying to send message: {err}. Probably trying to send a message during session clean up.");
}
}
});
});
if cfg!(feature = "mock") {
Self::connect_mock(callbacks);
} else {
Self::connect_inner(&runtime, portal_url, token, callbacks);
}
Ok(Self {
runtime: Some(runtime),
@@ -188,6 +143,93 @@ where
})
}
fn connect_inner(runtime: &Runtime, portal_url: Url, token: String, callbacks: CB) {
runtime.spawn(async move {
let private_key = StaticSecret::random_from_rng(OsRng);
let self_id = Uuid::new_v4();
let name_suffix: String = thread_rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect();
let connect_url = fatal_error!(get_websocket_path(portal_url, token, T::socket_path(), &Key(PublicKey::from(&private_key).to_bytes()), &self_id.to_string(), &name_suffix), callbacks);
let (sender, mut receiver) = fatal_error!(T::start(private_key, callbacks.clone()).await, callbacks);
let mut connection = PhoenixChannel::<_, U, R, M>::new(connect_url, move |msg| {
let sender = sender.clone();
async move {
tracing::trace!("Received message: {msg:?}");
if let Err(e) = sender.send(msg).await {
tracing::warn!("Received a message after handler already closed: {e}. Probably message received during session clean up.");
}
}
});
// Used to send internal messages
let mut internal_sender = connection.sender();
let topic = T::socket_path().to_string();
let topic_send = topic.clone();
tokio::spawn(async move {
let mut exponential_backoff = ExponentialBackoffBuilder::default().build();
loop {
let result = connection.start(vec![topic.clone()]).await;
if let Some(t) = exponential_backoff.next_backoff() {
tracing::warn!("Error during connection to the portal, retrying in {} seconds", t.as_secs());
match result {
Ok(()) => callbacks.on_error(&tokio_tungstenite::tungstenite::Error::ConnectionClosed.into(), ErrorType::Recoverable),
Err(e) => callbacks.on_error(&e, ErrorType::Recoverable)
}
tokio::time::sleep(t).await;
} else {
tracing::error!("Connection to the portal error, check your internet or the status of the portal.\nDisconnecting interface.");
match result {
Ok(()) => callbacks.on_error(&crate::Error::PortalConnectionError(tokio_tungstenite::tungstenite::Error::ConnectionClosed), ErrorType::Fatal),
Err(e) => callbacks.on_error(&e, ErrorType::Fatal)
}
break;
}
}
});
// TODO: Implement Sink for PhoenixEvent (created from a PhoenixSender event + topic)
// that way we can simply do receiver.forward(sender)
tokio::spawn(async move {
while let Some(message) = receiver.recv().await {
if let Err(err) = internal_sender.send(&topic_send, message).await {
tracing::error!("Channel already closed when trying to send message: {err}. Probably trying to send a message during session clean up.");
}
}
});
});
}
fn connect_mock(callbacks: CB) {
std::thread::sleep(Duration::from_secs(1));
callbacks.on_connect(TunnelAddresses {
address4: "100.100.111.2".parse().unwrap(),
address6: "fd00:0222:2021:1111::2".parse().unwrap(),
});
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(3));
callbacks.on_update_resources(ResourceList {
resources: vec![
serde_json::to_string(&ResourceDescription::Cidr(ResourceDescriptionCidr {
id: Uuid::new_v4(),
address: "8.8.4.4".parse::<Ipv4Addr>().unwrap().into(),
name: "Google Public DNS IPv4".to_string(),
}))
.unwrap(),
serde_json::to_string(&ResourceDescription::Cidr(ResourceDescriptionCidr {
id: Uuid::new_v4(),
address: "2001:4860:4860::8844".parse::<Ipv6Addr>().unwrap().into(),
name: "Google Public DNS IPv6".to_string(),
}))
.unwrap(),
],
});
});
}
/// Cleanup a [Session].
///
/// For now this just drops the runtime, which should drop all pending tasks.