fix(gateway): don't errenously suspend eventloop (#4486)

Within the gateway's eventloop, we MUST only return `Poll::Pending` if
`Waker`s are registered for anything that needs to happen. To ensure
that, we MUST `loop` around our the calls to `poll()` to ensure we drain
everything that is `Poll::Ready`.

Only once all sub-state machines return `Poll::Pending`, we can return
`Poll::Pending`.
This commit is contained in:
Thomas Eizinger
2024-04-04 10:24:38 +11:00
committed by GitHub
parent f29bf5010b
commit e169150ee7

View File

@@ -24,7 +24,7 @@ pub const PHOENIX_TOPIC: &str = "gateway";
pub struct Eventloop {
tunnel: GatewayTunnel<CallbackHandler>,
portal: PhoenixChannel<(), IngressMessages, EgressMessages>,
portal: PhoenixChannel<(), IngressMessages, ()>,
resolve_tasks:
futures_bounded::FuturesTupleSet<Vec<IpNetwork>, Either<RequestConnection, AllowAccess>>,
@@ -33,7 +33,7 @@ pub struct Eventloop {
impl Eventloop {
pub(crate) fn new(
tunnel: GatewayTunnel<CallbackHandler>,
portal: PhoenixChannel<(), IngressMessages, EgressMessages>,
portal: PhoenixChannel<(), IngressMessages, ()>,
) -> Self {
Self {
tunnel,
@@ -47,18 +47,8 @@ impl Eventloop {
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Infallible>> {
loop {
match self.tunnel.poll_next_event(cx) {
Poll::Ready(Ok(firezone_tunnel::GatewayEvent::SignalIceCandidate {
conn_id: client,
candidate,
})) => {
self.portal.send(
PHOENIX_TOPIC,
EgressMessages::BroadcastIceCandidates(BroadcastClientIceCandidates {
client_ids: vec![client],
candidates: vec![candidate],
}),
);
Poll::Ready(Ok(event)) => {
self.handle_tunnel_event(event);
continue;
}
Poll::Ready(Err(e)) => {
@@ -75,82 +65,106 @@ impl Eventloop {
}
Poll::Ready((result, Either::Right(req))) => {
self.allow_access(result, req);
continue;
}
Poll::Pending => {}
}
match self.portal.poll(cx)? {
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg: IngressMessages::RequestConnection(req),
..
}) => {
if self
.resolve_tasks
.try_push(
resolve(req.client.payload.domain.clone()),
Either::Left(req),
)
.is_err()
{
tracing::warn!("Too many connections requests, dropping existing one");
};
Poll::Ready(event) => {
self.handle_portal_event(event);
continue;
}
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg: IngressMessages::AllowAccess(req),
..
}) => {
if self
.resolve_tasks
.try_push(resolve(req.payload.clone()), Either::Right(req))
.is_err()
{
tracing::warn!("Too many allow access requests, dropping existing one");
};
continue;
}
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg:
IngressMessages::IceCandidates(ClientIceCandidates {
client_id,
candidates,
}),
..
}) => {
for candidate in candidates {
self.tunnel.add_ice_candidate(client_id, candidate);
}
continue;
}
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg:
IngressMessages::RejectAccess(RejectAccess {
client_id,
resource_id,
}),
..
}) => {
self.tunnel.remove_access(&client_id, &resource_id);
continue;
}
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg: IngressMessages::Init(_),
..
}) => {
// TODO: Handle `init` message during operation.
continue;
}
_ => {}
Poll::Pending => {}
}
return Poll::Pending;
}
}
fn handle_tunnel_event(&mut self, event: firezone_tunnel::GatewayEvent) {
match event {
firezone_tunnel::GatewayEvent::SignalIceCandidate {
conn_id: client,
candidate,
} => {
self.portal.send(
PHOENIX_TOPIC,
EgressMessages::BroadcastIceCandidates(BroadcastClientIceCandidates {
client_ids: vec![client],
candidates: vec![candidate],
}),
);
}
}
}
fn handle_portal_event(&mut self, event: phoenix_channel::Event<IngressMessages, ()>) {
match event {
phoenix_channel::Event::InboundMessage {
msg: IngressMessages::RequestConnection(req),
..
} => {
if self
.resolve_tasks
.try_push(
resolve(req.client.payload.domain.clone()),
Either::Left(req),
)
.is_err()
{
tracing::warn!("Too many connections requests, dropping existing one");
};
}
phoenix_channel::Event::InboundMessage {
msg: IngressMessages::AllowAccess(req),
..
} => {
if self
.resolve_tasks
.try_push(resolve(req.payload.clone()), Either::Right(req))
.is_err()
{
tracing::warn!("Too many allow access requests, dropping existing one");
};
}
phoenix_channel::Event::InboundMessage {
msg:
IngressMessages::IceCandidates(ClientIceCandidates {
client_id,
candidates,
}),
..
} => {
for candidate in candidates {
self.tunnel.add_ice_candidate(client_id, candidate);
}
}
phoenix_channel::Event::InboundMessage {
msg:
IngressMessages::RejectAccess(RejectAccess {
client_id,
resource_id,
}),
..
} => {
self.tunnel.remove_access(&client_id, &resource_id);
}
phoenix_channel::Event::InboundMessage {
msg: IngressMessages::Init(_),
..
} => {
// TODO: Handle `init` message during operation.
}
phoenix_channel::Event::ErrorResponse { topic, req_id, res } => {
tracing::warn!(%topic, %req_id, "Request failed: {res:?}");
}
phoenix_channel::Event::SuccessResponse { res: (), .. }
| phoenix_channel::Event::HeartbeatSent
| phoenix_channel::Event::JoinedRoom { .. } => {}
}
}
pub fn accept_connection(
&mut self,
result: Result<Vec<IpNetwork>, Timeout>,