fix(relay): don't starve items further down in the event-loop (#8177)

At present, the relay uses a priority in the event-loop that favors
routing traffic. Whenever a task further up in the loop is
`Poll::Ready`, we loop back to the top to continue processing. The issue
with that is that in very busy times, this can lead to starvation in
processing timers and messages from the portal. If we then finally get
to process portal messages, we think that the portal hasn't replied in
some time and proactively cut the connection and reconnect.

As a result, the portal will send `relays_presence` messages to the
clients and gateways which in turn will locally remove the relay. This
breaks relayed connections.

To fix this, instead of immediately traversing to the top of the
event-loop with `continue`, we only set a boolean. This gives each
element of the event-loop a chance to execute, even when a certain
component is very busy.

Related: #8165
Related: #8176
This commit is contained in:
Thomas Eizinger
2025-02-18 23:00:32 +11:00
committed by GitHub
parent 2e43523f75
commit 3e4976e4ab

View File

@@ -393,6 +393,8 @@ where
fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<()>> {
loop {
let mut ready = false;
if self.shutting_down && self.channel.is_none() && self.server.num_allocations() == 0 {
return Poll::Ready(Ok(()));
}
@@ -433,7 +435,7 @@ where
}
}
continue; // Attempt to process more commands.
ready = true;
}
// Priority 2: Read from our sockets.
@@ -482,7 +484,8 @@ where
tracing::warn!(target: "relay", %peer, "Failed to relay data to peer: {}", err_with_src(&e));
}
};
continue;
ready = true;
}
Poll::Ready(Ok(sockets::Received {
port, // Packets coming in on any other port are from peers.
@@ -508,11 +511,13 @@ where
tracing::warn!(target: "relay", %client, "Failed to relay data to client: {}", err_with_src(&e));
};
};
continue;
ready = true;
}
Poll::Ready(Err(sockets::Error::Io(e))) => {
tracing::warn!(target: "relay", "Error while receiving message: {}", err_with_src(&e));
continue;
ready = true;
}
Poll::Ready(Err(sockets::Error::MioTaskCrashed(e))) => return Poll::Ready(Err(e)), // Fail the event-loop. We can't operate without the `mio` worker-task.
Poll::Pending => {}
@@ -521,13 +526,14 @@ where
// Priority 3: Check when we need to next be woken. This needs to happen after all state modifications.
if let Some(timeout) = self.server.poll_timeout() {
Pin::new(&mut self.sleep).reset(timeout);
// Purposely no `continue` because we just change the state of `sleep` and we poll it below.
// Purposely no `ready = true` because we just change the state of `sleep` and we poll it below.
}
// Priority 4: Handle time-sensitive tasks:
if let Poll::Ready(deadline) = self.sleep.poll_unpin(cx) {
self.server.handle_timeout(deadline);
continue; // Handle potentially new commands.
ready = true;
}
// Priority 5: Handle portal messages
@@ -536,7 +542,7 @@ where
let event = result.context("Portal connection failed")?;
self.handle_portal_event(event);
continue;
ready = true;
}
Some(Poll::Pending) | None => {}
}
@@ -563,7 +569,7 @@ where
}
}
continue;
ready = true;
}
Poll::Ready(None) | Poll::Pending => {}
}
@@ -580,10 +586,12 @@ where
tracing::info!(target: "relay", "Allocations = {num_allocations} Channels = {num_channels} Throughput = {}", fmt_human_throughput(avg_throughput as f64));
continue;
ready = true;
}
return Poll::Pending;
if !ready {
break Poll::Pending;
}
}
}