mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 18:18:55 +00:00
Currently, the DNS records for the portal's hostname are only resolved during startup. When the WebSocket connection fails, we try to reconnect but only with the IPs that we have previously resolved. If the local IP stack changed since then or the hostname now points to different IPs, we will run into the reconnect-timeout configured in `phoenix-channel`. To fix this, we re-resolve the portal's hostname every time the WebSocket connection fails. For the Gateway, this is easy as we can simply reuse the already existing `TokioResolver` provided by hickory. For the Client, we need to write our own DNS client on top of our socket factory abstraction to ensure we don't create a routing loop with the resulting DNS queries. To simplify things, we only send DNS queries over UDP. Those are not guaranteed to succeed but given that we do this on every "hiccup", we already have a retry mechanism. We use the currently configured upstream DNS servers for this. Resolves: #10238
165 lines
5.2 KiB
Rust
165 lines
5.2 KiB
Rust
#![cfg_attr(test, allow(clippy::unwrap_used))]
|
|
|
|
//! Main connlib library for clients.
|
|
pub use connlib_model::StaticSecret;
|
|
pub use eventloop::DisconnectError;
|
|
pub use firezone_tunnel::TunConfig;
|
|
pub use firezone_tunnel::messages::client::{IngressMessages, ResourceDescription};
|
|
|
|
use anyhow::Result;
|
|
use connlib_model::ResourceView;
|
|
use eventloop::{Command, Eventloop};
|
|
use futures::future::Fuse;
|
|
use futures::{FutureExt, StreamExt};
|
|
use phoenix_channel::{PhoenixChannel, PublicKeyParam};
|
|
use socket_factory::{SocketFactory, TcpSocket, UdpSocket};
|
|
use std::future;
|
|
use std::net::IpAddr;
|
|
use std::sync::Arc;
|
|
use std::task::{Context, Poll};
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::watch;
|
|
use tokio::task::JoinHandle;
|
|
use tokio_stream::wrappers::WatchStream;
|
|
use tun::Tun;
|
|
|
|
mod eventloop;
|
|
|
|
const PHOENIX_TOPIC: &str = "client";
|
|
|
|
/// A session is the entry-point for connlib, maintains the runtime and the tunnel.
|
|
///
|
|
/// A session is created using [`Session::connect`].
|
|
/// To stop the session, simply drop this struct.
|
|
#[derive(Clone, Debug)]
|
|
pub struct Session {
|
|
channel: mpsc::UnboundedSender<Command>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct EventStream {
|
|
eventloop: Fuse<JoinHandle<Result<(), DisconnectError>>>,
|
|
resource_list_receiver: WatchStream<Vec<ResourceView>>,
|
|
tun_config_receiver: WatchStream<Option<TunConfig>>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum Event {
|
|
TunInterfaceUpdated(TunConfig),
|
|
ResourcesUpdated(Vec<ResourceView>),
|
|
Disconnected(DisconnectError),
|
|
}
|
|
|
|
impl Session {
|
|
pub fn connect(
|
|
tcp_socket_factory: Arc<dyn SocketFactory<TcpSocket>>,
|
|
udp_socket_factory: Arc<dyn SocketFactory<UdpSocket>>,
|
|
portal: PhoenixChannel<(), IngressMessages, PublicKeyParam>,
|
|
is_internet_resource_active: bool,
|
|
handle: tokio::runtime::Handle,
|
|
) -> (Self, EventStream) {
|
|
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
|
|
|
|
// Use `watch` channels for resource list and TUN config because we are only ever interested in the last value and don't care about intermediate updates.
|
|
let (tun_config_sender, tun_config_receiver) = watch::channel(None);
|
|
let (resource_list_sender, resource_list_receiver) = watch::channel(Vec::default());
|
|
|
|
let eventloop = handle.spawn(
|
|
Eventloop::new(
|
|
tcp_socket_factory,
|
|
udp_socket_factory,
|
|
is_internet_resource_active,
|
|
portal,
|
|
cmd_rx,
|
|
resource_list_sender,
|
|
tun_config_sender,
|
|
)
|
|
.run(),
|
|
);
|
|
|
|
(
|
|
Self { channel: cmd_tx },
|
|
EventStream {
|
|
eventloop: eventloop.fuse(),
|
|
resource_list_receiver: WatchStream::from_changes(resource_list_receiver),
|
|
tun_config_receiver: WatchStream::from_changes(tun_config_receiver),
|
|
},
|
|
)
|
|
}
|
|
|
|
/// Reset a [`Session`].
|
|
///
|
|
/// Resetting a session will:
|
|
///
|
|
/// - Close and re-open a connection to the portal.
|
|
/// - Delete all allocations.
|
|
/// - Rebind local UDP sockets.
|
|
pub fn reset(&self, reason: String) {
|
|
let _ = self.channel.send(Command::Reset(reason));
|
|
}
|
|
|
|
/// Sets a new set of upstream DNS servers for this [`Session`].
|
|
///
|
|
/// Changing the DNS servers clears all cached DNS requests which may be disruptive to the UX.
|
|
/// Clients should only call this when relevant.
|
|
///
|
|
/// The implementation is idempotent; calling it with the same set of servers is safe.
|
|
pub fn set_dns(&self, new_dns: Vec<IpAddr>) {
|
|
let _ = self.channel.send(Command::SetDns(new_dns));
|
|
}
|
|
|
|
pub fn set_internet_resource_state(&self, active: bool) {
|
|
let _ = self.channel.send(Command::SetInternetResourceState(active));
|
|
}
|
|
|
|
/// Sets a new [`Tun`] device handle.
|
|
pub fn set_tun(&self, new_tun: Box<dyn Tun>) {
|
|
let _ = self.channel.send(Command::SetTun(new_tun));
|
|
}
|
|
|
|
pub fn stop(&self) {
|
|
let _ = self.channel.send(Command::Stop);
|
|
}
|
|
}
|
|
|
|
impl EventStream {
|
|
pub fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Event>> {
|
|
match self.eventloop.poll_unpin(cx) {
|
|
Poll::Ready(Ok(Ok(()))) => return Poll::Ready(None),
|
|
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Event::Disconnected(e))),
|
|
Poll::Ready(Err(e)) => {
|
|
return Poll::Ready(Some(Event::Disconnected(DisconnectError::from(
|
|
anyhow::Error::new(e).context("connlib crashed"),
|
|
))));
|
|
}
|
|
Poll::Pending => {}
|
|
}
|
|
|
|
if let Poll::Ready(Some(resources)) = self.resource_list_receiver.poll_next_unpin(cx) {
|
|
return Poll::Ready(Some(Event::ResourcesUpdated(resources)));
|
|
}
|
|
|
|
if let Poll::Ready(Some(Some(config))) = self.tun_config_receiver.poll_next_unpin(cx) {
|
|
return Poll::Ready(Some(Event::TunInterfaceUpdated(config)));
|
|
}
|
|
|
|
Poll::Pending
|
|
}
|
|
|
|
pub async fn next(&mut self) -> Option<Event> {
|
|
future::poll_fn(|cx| self.poll_next(cx)).await
|
|
}
|
|
|
|
pub async fn drain(&mut self) -> Vec<Event> {
|
|
futures::stream::poll_fn(|cx| self.poll_next(cx))
|
|
.collect()
|
|
.await
|
|
}
|
|
}
|
|
|
|
impl Drop for Session {
|
|
fn drop(&mut self) {
|
|
tracing::debug!("`Session` dropped")
|
|
}
|
|
}
|