mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
refactor(connlib): move client phoenix-channel to separate task (#10210)
Currently, `connlib`'s event-loop for clients uses manual polling to advance the state of the tunnel and the phoenix-channel. Manual polling is powerful but also easy to get wrong, resulting in task-wakeup bugs. Additionally, if the tunnel is very busy with processing packets, the phoenix-channel may not get enough CPU time, resulting in a loss of the WebSocket connection. To fix this, we move the phoenix-channel to a separate task and use channels to connect it with `connlib`'s main event-loop. This one is now primarily focused on advancing the tunnel state, effectively offloading the problem of fair scheduling to the tokio runtime. Related: #10003
This commit is contained in:
1
rust/Cargo.lock
generated
1
rust/Cargo.lock
generated
@@ -1347,6 +1347,7 @@ dependencies = [
|
||||
"dns-types",
|
||||
"firezone-logging",
|
||||
"firezone-tunnel",
|
||||
"futures",
|
||||
"ip_network",
|
||||
"libc",
|
||||
"phoenix-channel",
|
||||
|
||||
@@ -12,6 +12,7 @@ connlib-model = { workspace = true }
|
||||
dns-types = { workspace = true }
|
||||
firezone-logging = { workspace = true }
|
||||
firezone-tunnel = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
ip_network = { workspace = true }
|
||||
libc = { workspace = true }
|
||||
phoenix-channel = { workspace = true }
|
||||
|
||||
@@ -7,11 +7,12 @@ use firezone_tunnel::messages::client::{
|
||||
EgressMessages, FailReason, FlowCreated, FlowCreationFailed, GatewayIceCandidates,
|
||||
GatewaysIceCandidates, IngressMessages, InitClient,
|
||||
};
|
||||
use firezone_tunnel::{ClientTunnel, IpConfig};
|
||||
use firezone_tunnel::{ClientEvent, ClientTunnel, IpConfig, TunConfig};
|
||||
use ip_network::{Ipv4Network, Ipv6Network};
|
||||
use phoenix_channel::{ErrorReply, OutboundRequestId, PhoenixChannel, PublicKeyParam};
|
||||
use std::mem;
|
||||
use phoenix_channel::{ErrorReply, PhoenixChannel, PublicKeyParam};
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
use std::ops::ControlFlow;
|
||||
use std::pin::pin;
|
||||
use std::time::Instant;
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
@@ -19,15 +20,18 @@ use std::{
|
||||
net::IpAddr,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use std::{future, mem};
|
||||
use tokio::sync::mpsc;
|
||||
use tun::Tun;
|
||||
|
||||
pub struct Eventloop {
|
||||
tunnel: ClientTunnel,
|
||||
|
||||
portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>,
|
||||
cmd_rx: tokio::sync::mpsc::UnboundedReceiver<Command>,
|
||||
event_tx: tokio::sync::mpsc::Sender<Event>,
|
||||
cmd_rx: mpsc::UnboundedReceiver<Command>,
|
||||
event_tx: mpsc::Sender<Event>,
|
||||
|
||||
portal_event_rx: mpsc::Receiver<PortalEvent>,
|
||||
portal_cmd_tx: mpsc::Sender<PortalCommand>,
|
||||
|
||||
logged_permission_denied: bool,
|
||||
}
|
||||
@@ -54,6 +58,33 @@ pub enum Event {
|
||||
Disconnected(DisconnectError),
|
||||
}
|
||||
|
||||
impl Event {
|
||||
fn tun_interface_updated(config: TunConfig) -> Self {
|
||||
Self::TunInterfaceUpdated {
|
||||
ipv4: config.ip.v4,
|
||||
ipv6: config.ip.v6,
|
||||
dns: config.dns_by_sentinel.left_values().copied().collect(),
|
||||
search_domain: config.search_domain,
|
||||
ipv4_routes: Vec::from_iter(config.ipv4_routes),
|
||||
ipv6_routes: Vec::from_iter(config.ipv6_routes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum PortalCommand {
|
||||
Connect(PublicKeyParam),
|
||||
Send(EgressMessages),
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::large_enum_variant,
|
||||
reason = "This type is only sent through a channel so the stack-size doesn't matter much."
|
||||
)]
|
||||
enum PortalEvent {
|
||||
Received(IngressMessages),
|
||||
Error(phoenix_channel::Error),
|
||||
}
|
||||
|
||||
/// Unified error type to use across connlib.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("{0:#}")]
|
||||
@@ -79,216 +110,193 @@ impl Eventloop {
|
||||
pub(crate) fn new(
|
||||
tunnel: ClientTunnel,
|
||||
mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>,
|
||||
cmd_rx: tokio::sync::mpsc::UnboundedReceiver<Command>,
|
||||
event_tx: tokio::sync::mpsc::Sender<Event>,
|
||||
cmd_rx: mpsc::UnboundedReceiver<Command>,
|
||||
event_tx: mpsc::Sender<Event>,
|
||||
) -> Self {
|
||||
let (portal_event_tx, portal_event_rx) = mpsc::channel(128);
|
||||
let (portal_cmd_tx, portal_cmd_rx) = mpsc::channel(128);
|
||||
|
||||
portal.connect(PublicKeyParam(tunnel.public_key().to_bytes()));
|
||||
|
||||
tokio::spawn(phoenix_channel_event_loop(
|
||||
portal,
|
||||
portal_event_tx,
|
||||
portal_cmd_rx,
|
||||
));
|
||||
|
||||
Self {
|
||||
tunnel,
|
||||
portal,
|
||||
cmd_rx,
|
||||
event_tx,
|
||||
logged_permission_denied: false,
|
||||
portal_event_rx,
|
||||
portal_cmd_tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum CombinedEvent {
|
||||
Command(Option<Command>),
|
||||
Tunnel(Result<ClientEvent>),
|
||||
Portal(Option<PortalEvent>),
|
||||
}
|
||||
|
||||
impl Eventloop {
|
||||
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
pub async fn run(mut self) -> Result<()> {
|
||||
loop {
|
||||
match self.cmd_rx.poll_recv(cx) {
|
||||
Poll::Ready(None | Some(Command::Stop)) => return Poll::Ready(Ok(())),
|
||||
Poll::Ready(Some(Command::SetDns(dns))) => {
|
||||
self.tunnel.state_mut().update_system_resolvers(dns);
|
||||
|
||||
continue;
|
||||
match future::poll_fn(|cx| self.next_event(cx)).await {
|
||||
CombinedEvent::Command(None) => return Ok(()),
|
||||
CombinedEvent::Command(Some(cmd)) => {
|
||||
match self.handle_eventloop_command(cmd).await? {
|
||||
ControlFlow::Continue(()) => {}
|
||||
ControlFlow::Break(()) => return Ok(()),
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Command::SetDisabledResources(resources))) => {
|
||||
self.tunnel.state_mut().set_disabled_resources(resources);
|
||||
continue;
|
||||
CombinedEvent::Tunnel(Ok(event)) => self.handle_tunnel_event(event).await?,
|
||||
CombinedEvent::Tunnel(Err(e)) => self.handle_tunnel_error(e)?,
|
||||
CombinedEvent::Portal(Some(PortalEvent::Received(msg))) => {
|
||||
self.handle_portal_message(msg).await?;
|
||||
}
|
||||
Poll::Ready(Some(Command::SetTun(tun))) => {
|
||||
self.tunnel.set_tun(tun);
|
||||
continue;
|
||||
CombinedEvent::Portal(Some(PortalEvent::Error(e))) => {
|
||||
return Err(e).context("Connection to portal failed");
|
||||
}
|
||||
Poll::Ready(Some(Command::Reset(reason))) => {
|
||||
self.tunnel.reset(&reason);
|
||||
self.portal
|
||||
.connect(PublicKeyParam(self.tunnel.public_key().to_bytes()));
|
||||
|
||||
continue;
|
||||
CombinedEvent::Portal(None) => {
|
||||
return Err(anyhow::Error::msg("portal task exited unexpectedly"));
|
||||
}
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
match self.tunnel.poll_next_event(cx) {
|
||||
Poll::Ready(Ok(event)) => {
|
||||
let Some(e) = self.handle_tunnel_event(event) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
match self.event_tx.try_send(e) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
tracing::debug!("Event receiver dropped, exiting event loop");
|
||||
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
Err(TrySendError::Full(_)) => {
|
||||
tracing::warn!("App cannot keep up with connlib events, dropping");
|
||||
}
|
||||
};
|
||||
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
if e.root_cause()
|
||||
.downcast_ref::<io::Error>()
|
||||
.is_some_and(is_unreachable)
|
||||
{
|
||||
tracing::debug!("{e:#}"); // Log these on DEBUG so they don't go completely unnoticed.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Invalid Input can be all sorts of things but we mostly see it with unreachable addresses.
|
||||
if e.root_cause()
|
||||
.downcast_ref::<io::Error>()
|
||||
.is_some_and(|e| e.kind() == io::ErrorKind::InvalidInput)
|
||||
{
|
||||
tracing::debug!("{e:#}");
|
||||
continue;
|
||||
}
|
||||
|
||||
if e.root_cause()
|
||||
.is::<firezone_tunnel::UdpSocketThreadStopped>()
|
||||
{
|
||||
return Poll::Ready(Err(e));
|
||||
}
|
||||
|
||||
if e.root_cause()
|
||||
.downcast_ref::<io::Error>()
|
||||
.is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied)
|
||||
{
|
||||
if !mem::replace(&mut self.logged_permission_denied, true) {
|
||||
tracing::info!(
|
||||
"Encountered `PermissionDenied` IO error. Check your local firewall rules to allow outbound STUN/TURN/WireGuard and general UDP traffic."
|
||||
)
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
tracing::warn!("Tunnel error: {e:#}");
|
||||
continue;
|
||||
}
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
match self.portal.poll(cx) {
|
||||
Poll::Ready(result) => {
|
||||
let event = result.context("connection to the portal failed")?;
|
||||
self.handle_portal_event(event);
|
||||
continue;
|
||||
}
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_tunnel_event(&mut self, event: firezone_tunnel::ClientEvent) -> Option<Event> {
|
||||
async fn handle_eventloop_command(&mut self, command: Command) -> Result<ControlFlow<(), ()>> {
|
||||
match command {
|
||||
Command::Stop => return Ok(ControlFlow::Break(())),
|
||||
Command::SetDns(dns) => self.tunnel.state_mut().update_system_resolvers(dns),
|
||||
Command::SetDisabledResources(resources) => {
|
||||
self.tunnel.state_mut().set_disabled_resources(resources)
|
||||
}
|
||||
Command::SetTun(tun) => {
|
||||
self.tunnel.set_tun(tun);
|
||||
}
|
||||
Command::Reset(reason) => {
|
||||
self.tunnel.reset(&reason);
|
||||
self.portal_cmd_tx
|
||||
.send(PortalCommand::Connect(PublicKeyParam(
|
||||
self.tunnel.public_key().to_bytes(),
|
||||
)))
|
||||
.await
|
||||
.context("Failed to connect phoenix-channel")?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ControlFlow::Continue(()))
|
||||
}
|
||||
|
||||
async fn handle_tunnel_event(&mut self, event: ClientEvent) -> Result<()> {
|
||||
match event {
|
||||
firezone_tunnel::ClientEvent::AddedIceCandidates {
|
||||
ClientEvent::AddedIceCandidates {
|
||||
conn_id: gid,
|
||||
candidates,
|
||||
} => {
|
||||
tracing::debug!(%gid, ?candidates, "Sending new ICE candidates to gateway");
|
||||
|
||||
self.portal.send(
|
||||
PHOENIX_TOPIC,
|
||||
EgressMessages::BroadcastIceCandidates(GatewaysIceCandidates {
|
||||
gateway_ids: vec![gid],
|
||||
candidates,
|
||||
}),
|
||||
);
|
||||
|
||||
None
|
||||
self.portal_cmd_tx
|
||||
.send(PortalCommand::Send(EgressMessages::BroadcastIceCandidates(
|
||||
GatewaysIceCandidates {
|
||||
gateway_ids: vec![gid],
|
||||
candidates,
|
||||
},
|
||||
)))
|
||||
.await
|
||||
.context("Failed to send message to portal")?;
|
||||
}
|
||||
firezone_tunnel::ClientEvent::RemovedIceCandidates {
|
||||
ClientEvent::RemovedIceCandidates {
|
||||
conn_id: gid,
|
||||
candidates,
|
||||
} => {
|
||||
tracing::debug!(%gid, ?candidates, "Sending invalidated ICE candidates to gateway");
|
||||
|
||||
self.portal.send(
|
||||
PHOENIX_TOPIC,
|
||||
EgressMessages::BroadcastInvalidatedIceCandidates(GatewaysIceCandidates {
|
||||
gateway_ids: vec![gid],
|
||||
candidates,
|
||||
}),
|
||||
);
|
||||
|
||||
None
|
||||
self.portal_cmd_tx
|
||||
.send(PortalCommand::Send(
|
||||
EgressMessages::BroadcastInvalidatedIceCandidates(GatewaysIceCandidates {
|
||||
gateway_ids: vec![gid],
|
||||
candidates,
|
||||
}),
|
||||
))
|
||||
.await
|
||||
.context("Failed to send message to portal")?;
|
||||
}
|
||||
firezone_tunnel::ClientEvent::ConnectionIntent {
|
||||
ClientEvent::ConnectionIntent {
|
||||
connected_gateway_ids,
|
||||
resource,
|
||||
} => {
|
||||
self.portal.send(
|
||||
PHOENIX_TOPIC,
|
||||
EgressMessages::CreateFlow {
|
||||
self.portal_cmd_tx
|
||||
.send(PortalCommand::Send(EgressMessages::CreateFlow {
|
||||
resource_id: resource,
|
||||
connected_gateway_ids,
|
||||
},
|
||||
);
|
||||
|
||||
None
|
||||
}))
|
||||
.await
|
||||
.context("Failed to send message to portal")?;
|
||||
}
|
||||
firezone_tunnel::ClientEvent::ResourcesChanged { resources } => {
|
||||
Some(Event::ResourcesUpdated(resources))
|
||||
ClientEvent::ResourcesChanged { resources } => {
|
||||
self.event_tx
|
||||
.send(Event::ResourcesUpdated(resources))
|
||||
.await
|
||||
.context("Failed to emit event")?;
|
||||
}
|
||||
firezone_tunnel::ClientEvent::TunInterfaceUpdated(config) => {
|
||||
Some(Event::TunInterfaceUpdated {
|
||||
ipv4: config.ip.v4,
|
||||
ipv6: config.ip.v6,
|
||||
dns: config.dns_by_sentinel.left_values().copied().collect(),
|
||||
search_domain: config.search_domain,
|
||||
ipv4_routes: Vec::from_iter(config.ipv4_routes),
|
||||
ipv6_routes: Vec::from_iter(config.ipv6_routes),
|
||||
})
|
||||
ClientEvent::TunInterfaceUpdated(config) => {
|
||||
self.event_tx
|
||||
.send(Event::tun_interface_updated(config))
|
||||
.await
|
||||
.context("Failed to emit event")?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_portal_event(&mut self, event: phoenix_channel::Event<IngressMessages, ()>) {
|
||||
match event {
|
||||
phoenix_channel::Event::InboundMessage { msg, .. } => {
|
||||
self.handle_portal_inbound_message(msg);
|
||||
}
|
||||
phoenix_channel::Event::SuccessResponse { res: (), .. } => {}
|
||||
phoenix_channel::Event::ErrorResponse { res, req_id, topic } => {
|
||||
self.handle_portal_error_reply(res, topic, req_id);
|
||||
}
|
||||
phoenix_channel::Event::HeartbeatSent => {}
|
||||
phoenix_channel::Event::JoinedRoom { .. } => {}
|
||||
phoenix_channel::Event::Closed => {
|
||||
unimplemented!("Client never actively closes the portal connection")
|
||||
}
|
||||
phoenix_channel::Event::Hiccup {
|
||||
backoff,
|
||||
max_elapsed_time,
|
||||
error,
|
||||
} => tracing::info!(
|
||||
?backoff,
|
||||
?max_elapsed_time,
|
||||
"Hiccup in portal connection: {error:#}"
|
||||
),
|
||||
fn handle_tunnel_error(&mut self, e: anyhow::Error) -> Result<()> {
|
||||
if e.root_cause()
|
||||
.downcast_ref::<io::Error>()
|
||||
.is_some_and(is_unreachable)
|
||||
{
|
||||
tracing::debug!("{e:#}"); // Log these on DEBUG so they don't go completely unnoticed.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Invalid Input can be all sorts of things but we mostly see it with unreachable addresses.
|
||||
if e.root_cause()
|
||||
.downcast_ref::<io::Error>()
|
||||
.is_some_and(|e| e.kind() == io::ErrorKind::InvalidInput)
|
||||
{
|
||||
tracing::debug!("{e:#}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if e.root_cause()
|
||||
.is::<firezone_tunnel::UdpSocketThreadStopped>()
|
||||
{
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
if e.root_cause()
|
||||
.downcast_ref::<io::Error>()
|
||||
.is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied)
|
||||
{
|
||||
if !mem::replace(&mut self.logged_permission_denied, true) {
|
||||
tracing::info!(
|
||||
"Encountered `PermissionDenied` IO error. Check your local firewall rules to allow outbound STUN/TURN/WireGuard and general UDP traffic."
|
||||
)
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tracing::warn!("Tunnel error: {e:#}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_portal_inbound_message(&mut self, msg: IngressMessages) {
|
||||
async fn handle_portal_message(&mut self, msg: IngressMessages) -> Result<()> {
|
||||
match msg {
|
||||
IngressMessages::ConfigChanged(config) => self
|
||||
.tunnel
|
||||
@@ -377,8 +385,12 @@ impl Eventloop {
|
||||
);
|
||||
|
||||
// Re-connecting to the portal means we will receive another `init` and thus new TURN servers.
|
||||
self.portal
|
||||
.connect(PublicKeyParam(self.tunnel.public_key().to_bytes()));
|
||||
self.portal_cmd_tx
|
||||
.send(PortalCommand::Connect(PublicKeyParam(
|
||||
self.tunnel.public_key().to_bytes(),
|
||||
)))
|
||||
.await
|
||||
.context("Failed to connect phoenix-channel")?;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to request new connection: {e:#}");
|
||||
@@ -396,23 +408,92 @@ impl Eventloop {
|
||||
tracing::debug!("Failed to create flow: {reason:?}")
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_portal_error_reply(
|
||||
&mut self,
|
||||
res: ErrorReply,
|
||||
topic: String,
|
||||
req_id: OutboundRequestId,
|
||||
) {
|
||||
match res {
|
||||
ErrorReply::Disabled => {
|
||||
tracing::debug!(%req_id, "Functionality is disabled");
|
||||
fn next_event(&mut self, cx: &mut Context) -> Poll<CombinedEvent> {
|
||||
if let Poll::Ready(cmd) = self.cmd_rx.poll_recv(cx) {
|
||||
return Poll::Ready(CombinedEvent::Command(cmd));
|
||||
}
|
||||
|
||||
if let Poll::Ready(event) = self.portal_event_rx.poll_recv(cx) {
|
||||
return Poll::Ready(CombinedEvent::Portal(event));
|
||||
}
|
||||
|
||||
if let Poll::Ready(event) = self.tunnel.poll_next_event(cx) {
|
||||
return Poll::Ready(CombinedEvent::Tunnel(event));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
async fn phoenix_channel_event_loop(
|
||||
mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>,
|
||||
event_tx: mpsc::Sender<PortalEvent>,
|
||||
mut cmd_rx: mpsc::Receiver<PortalCommand>,
|
||||
) {
|
||||
use futures::future::Either;
|
||||
use futures::future::select;
|
||||
use std::future::poll_fn;
|
||||
|
||||
loop {
|
||||
match select(poll_fn(|cx| portal.poll(cx)), pin!(cmd_rx.recv())).await {
|
||||
Either::Left((Ok(phoenix_channel::Event::InboundMessage { msg, .. }), _)) => {
|
||||
if event_tx.send(PortalEvent::Received(msg)).await.is_err() {
|
||||
tracing::debug!("Event channel closed: exiting phoenix-channel event-loop");
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
ErrorReply::UnmatchedTopic => {
|
||||
self.portal.join(topic, ());
|
||||
Either::Left((Ok(phoenix_channel::Event::SuccessResponse { res: (), .. }), _)) => {}
|
||||
Either::Left((Ok(phoenix_channel::Event::ErrorResponse { res, req_id, topic }), _)) => {
|
||||
match res {
|
||||
ErrorReply::Disabled => {
|
||||
tracing::debug!(%req_id, "Functionality is disabled");
|
||||
}
|
||||
ErrorReply::UnmatchedTopic => {
|
||||
portal.join(topic, ());
|
||||
}
|
||||
reason @ (ErrorReply::InvalidVersion | ErrorReply::Other) => {
|
||||
tracing::debug!(%req_id, %reason, "Request failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
reason @ (ErrorReply::InvalidVersion | ErrorReply::Other) => {
|
||||
tracing::debug!(%req_id, %reason, "Request failed");
|
||||
Either::Left((Ok(phoenix_channel::Event::HeartbeatSent), _)) => {}
|
||||
Either::Left((Ok(phoenix_channel::Event::JoinedRoom { .. }), _)) => {}
|
||||
Either::Left((Ok(phoenix_channel::Event::Closed), _)) => {
|
||||
unimplemented!("Client never actively closes the portal connection")
|
||||
}
|
||||
Either::Left((
|
||||
Ok(phoenix_channel::Event::Hiccup {
|
||||
backoff,
|
||||
max_elapsed_time,
|
||||
error,
|
||||
}),
|
||||
_,
|
||||
)) => tracing::info!(
|
||||
?backoff,
|
||||
?max_elapsed_time,
|
||||
"Hiccup in portal connection: {error:#}"
|
||||
),
|
||||
Either::Left((Err(e), _)) => {
|
||||
if event_tx.send(PortalEvent::Error(e)).await.is_err() {
|
||||
tracing::debug!("Event channel closed: exiting phoenix-channel event-loop");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Either::Right((Some(PortalCommand::Send(msg)), _)) => {
|
||||
portal.send(PHOENIX_TOPIC, msg);
|
||||
}
|
||||
Either::Right((Some(PortalCommand::Connect(param)), _)) => {
|
||||
portal.connect(param);
|
||||
}
|
||||
Either::Right((None, _)) => {
|
||||
tracing::debug!("Command channel closed: exiting phoenix-channel event-loop");
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,10 +138,14 @@ async fn connect(
|
||||
cmd_rx: UnboundedReceiver<Command>,
|
||||
event_tx: Sender<Event>,
|
||||
) -> Result<()> {
|
||||
let tunnel = ClientTunnel::new(tcp_socket_factory, udp_socket_factory);
|
||||
let mut eventloop = Eventloop::new(tunnel, portal, cmd_rx, event_tx);
|
||||
|
||||
std::future::poll_fn(|cx| eventloop.poll(cx)).await?;
|
||||
Eventloop::new(
|
||||
ClientTunnel::new(tcp_socket_factory, udp_socket_factory),
|
||||
portal,
|
||||
cmd_rx,
|
||||
event_tx,
|
||||
)
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user