From e169150ee709a7c9930dba6cb1d31308496a8098 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 4 Apr 2024 10:24:38 +1100 Subject: [PATCH] fix(gateway): don't errenously suspend eventloop (#4486) Within the gateway's eventloop, we MUST only return `Poll::Pending` if `Waker`s are registered for anything that needs to happen. To ensure that, we MUST `loop` around our the calls to `poll()` to ensure we drain everything that is `Poll::Ready`. Only once all sub-state machines return `Poll::Pending`, we can return `Poll::Pending`. --- rust/gateway/src/eventloop.rs | 168 ++++++++++++++++++---------------- 1 file changed, 91 insertions(+), 77 deletions(-) diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index beba22a3a..dc5d9c213 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -24,7 +24,7 @@ pub const PHOENIX_TOPIC: &str = "gateway"; pub struct Eventloop { tunnel: GatewayTunnel, - portal: PhoenixChannel<(), IngressMessages, EgressMessages>, + portal: PhoenixChannel<(), IngressMessages, ()>, resolve_tasks: futures_bounded::FuturesTupleSet, Either>, @@ -33,7 +33,7 @@ pub struct Eventloop { impl Eventloop { pub(crate) fn new( tunnel: GatewayTunnel, - portal: PhoenixChannel<(), IngressMessages, EgressMessages>, + portal: PhoenixChannel<(), IngressMessages, ()>, ) -> Self { Self { tunnel, @@ -47,18 +47,8 @@ impl Eventloop { pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match self.tunnel.poll_next_event(cx) { - Poll::Ready(Ok(firezone_tunnel::GatewayEvent::SignalIceCandidate { - conn_id: client, - candidate, - })) => { - self.portal.send( - PHOENIX_TOPIC, - EgressMessages::BroadcastIceCandidates(BroadcastClientIceCandidates { - client_ids: vec![client], - candidates: vec![candidate], - }), - ); - + Poll::Ready(Ok(event)) => { + self.handle_tunnel_event(event); continue; } Poll::Ready(Err(e)) => { @@ -75,82 +65,106 @@ impl Eventloop { } Poll::Ready((result, Either::Right(req))) => { self.allow_access(result, req); - continue; } Poll::Pending => {} } + match self.portal.poll(cx)? { - Poll::Ready(phoenix_channel::Event::InboundMessage { - msg: IngressMessages::RequestConnection(req), - .. - }) => { - if self - .resolve_tasks - .try_push( - resolve(req.client.payload.domain.clone()), - Either::Left(req), - ) - .is_err() - { - tracing::warn!("Too many connections requests, dropping existing one"); - }; - + Poll::Ready(event) => { + self.handle_portal_event(event); continue; } - Poll::Ready(phoenix_channel::Event::InboundMessage { - msg: IngressMessages::AllowAccess(req), - .. - }) => { - if self - .resolve_tasks - .try_push(resolve(req.payload.clone()), Either::Right(req)) - .is_err() - { - tracing::warn!("Too many allow access requests, dropping existing one"); - }; - - continue; - } - Poll::Ready(phoenix_channel::Event::InboundMessage { - msg: - IngressMessages::IceCandidates(ClientIceCandidates { - client_id, - candidates, - }), - .. - }) => { - for candidate in candidates { - self.tunnel.add_ice_candidate(client_id, candidate); - } - continue; - } - - Poll::Ready(phoenix_channel::Event::InboundMessage { - msg: - IngressMessages::RejectAccess(RejectAccess { - client_id, - resource_id, - }), - .. - }) => { - self.tunnel.remove_access(&client_id, &resource_id); - continue; - } - Poll::Ready(phoenix_channel::Event::InboundMessage { - msg: IngressMessages::Init(_), - .. - }) => { - // TODO: Handle `init` message during operation. - continue; - } - _ => {} + Poll::Pending => {} } return Poll::Pending; } } + fn handle_tunnel_event(&mut self, event: firezone_tunnel::GatewayEvent) { + match event { + firezone_tunnel::GatewayEvent::SignalIceCandidate { + conn_id: client, + candidate, + } => { + self.portal.send( + PHOENIX_TOPIC, + EgressMessages::BroadcastIceCandidates(BroadcastClientIceCandidates { + client_ids: vec![client], + candidates: vec![candidate], + }), + ); + } + } + } + + fn handle_portal_event(&mut self, event: phoenix_channel::Event) { + match event { + phoenix_channel::Event::InboundMessage { + msg: IngressMessages::RequestConnection(req), + .. + } => { + if self + .resolve_tasks + .try_push( + resolve(req.client.payload.domain.clone()), + Either::Left(req), + ) + .is_err() + { + tracing::warn!("Too many connections requests, dropping existing one"); + }; + } + phoenix_channel::Event::InboundMessage { + msg: IngressMessages::AllowAccess(req), + .. + } => { + if self + .resolve_tasks + .try_push(resolve(req.payload.clone()), Either::Right(req)) + .is_err() + { + tracing::warn!("Too many allow access requests, dropping existing one"); + }; + } + phoenix_channel::Event::InboundMessage { + msg: + IngressMessages::IceCandidates(ClientIceCandidates { + client_id, + candidates, + }), + .. + } => { + for candidate in candidates { + self.tunnel.add_ice_candidate(client_id, candidate); + } + } + phoenix_channel::Event::InboundMessage { + msg: + IngressMessages::RejectAccess(RejectAccess { + client_id, + resource_id, + }), + .. + } => { + self.tunnel.remove_access(&client_id, &resource_id); + } + phoenix_channel::Event::InboundMessage { + msg: IngressMessages::Init(_), + .. + } => { + // TODO: Handle `init` message during operation. + } + phoenix_channel::Event::ErrorResponse { topic, req_id, res } => { + tracing::warn!(%topic, %req_id, "Request failed: {res:?}"); + } + phoenix_channel::Event::SuccessResponse { res: (), .. } + | phoenix_channel::Event::HeartbeatSent + | phoenix_channel::Event::JoinedRoom { .. } => {} + } + } + pub fn accept_connection( &mut self, result: Result, Timeout>,