fix(portal): groom resource list and flows periodically (#10005)

Time-based policy conditions are tricky. When they authorize a flow, we
correctly tell the Gateway to remove access when the time window
expires.

However, we do nothing on the client to reset the connectivity state.
This means that whenever the window of time of access was re-entered,
the client would essentially never be able to connect to it again until
the resource was toggled.

To fix this, we add a 1-minute check in the client channel that
re-checks allowed resources, and updates the client state with the
difference. This means that policies that have time-based conditions are
only accurate to the minute, but this is how they're presented anyhow.


For good measure, we also add a periodic job that runs every minute to
delete expired Flows. This will propagate to the Gateway where, if the
access for a particular client-resource is determined to be actually
gone, will receive `reject_access`.

Zooming out a bit, this PR furthers the theme that:

- Client channels react to underlying resource / policy / membership
changes directly, while
- Gateway channels react primarily to flows being deleted, or the
downstream effects of a prior client authorization
This commit is contained in:
Jamil
2025-07-25 17:04:41 -04:00
committed by GitHub
parent 2959cca8ce
commit f1a5af356d
9 changed files with 172 additions and 40 deletions

View File

@@ -20,6 +20,11 @@ defmodule API.Client.Channel do
require Logger
require OpenTelemetry.Tracer
# For time-based policy conditions, we need to determine whether we still have access
# If not, we need to send resource_deleted so that if it's added back later, the client's
# connlib state will be cleaned up so it can request a new connection.
@recompute_authorized_resources_every :timer.minutes(1)
@gateway_compatibility [
# We introduced new websocket protocol and the clients of version 1.4+
# are only compatible with gateways of version 1.4+
@@ -63,6 +68,13 @@ defmodule API.Client.Channel do
# Called immediately after the client joins the channel
def handle_info(:after_join, socket) do
# Schedule reassessing allowed resources
Process.send_after(
self(),
:recompute_authorized_resources,
@recompute_authorized_resources_every
)
# Initialize the cache.
socket =
socket
@@ -86,8 +98,15 @@ defmodule API.Client.Channel do
# Initialize resources
resources = authorized_resources(socket)
# Save list of authorized resources in the socket to check against in
# the recompute_authorized_resources timer
socket = assign(socket, authorized_resource_ids: MapSet.new(Enum.map(resources, & &1.id)))
# Delete any stale flows for resources we may not have access to anymore
Flows.delete_stale_flows_on_connect(socket.assigns.client, resources)
Flows.delete_stale_flows_on_connect(
socket.assigns.client,
resources
)
push(socket, "init", %{
resources: Views.Resource.render_many(resources),
@@ -107,6 +126,34 @@ defmodule API.Client.Channel do
{:noreply, socket}
end
# Needed to keep the client's resource list up to date for time-based policy conditions
def handle_info(:recompute_authorized_resources, socket) do
Process.send_after(
self(),
:recompute_authorized_resources,
@recompute_authorized_resources_every
)
old_authorized_resources =
Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids))
|> Map.values()
new_authorized_resources = authorized_resources(socket)
for resource <- old_authorized_resources -- new_authorized_resources do
push(socket, "resource_deleted", resource.id)
end
for resource <- new_authorized_resources -- old_authorized_resources do
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
end
{:noreply,
assign(socket,
authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id))
)}
end
# Called to actually push relays_presence with a disconnected relay to the client
def handle_info({:push_leave, relay_id, stamp_secret, payload}, socket) do
{:noreply, Debouncer.handle_leave(socket, relay_id, stamp_secret, payload, &push/3)}
@@ -139,8 +186,10 @@ defmodule API.Client.Channel do
%{assigns: %{client: %{actor_id: id}}} = socket
)
when id == actor_id do
# 1. Get existing resources
existing_resources = authorized_resources(socket)
# 1. Get existing authorized resources
old_authorized_resources =
Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids))
|> Map.values()
# 2. Re-hydrate our policies and resources
# It's not ideal we're hitting the DB here, but in practice it shouldn't be an issue because
@@ -152,14 +201,19 @@ defmodule API.Client.Channel do
memberships = Map.put(socket.assigns.memberships, group_id, membership)
socket = assign(socket, memberships: memberships)
# 3. Get new resources
new_resources = authorized_resources(socket) -- existing_resources
# 3. Compute new authorized resources
new_authorized_resources = authorized_resources(socket)
# 4. Push new resources to the client
for resource <- new_resources do
for resource <- new_authorized_resources -- old_authorized_resources do
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
end
socket =
assign(socket,
authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id))
)
{:noreply, socket}
end
@@ -209,27 +263,33 @@ defmodule API.Client.Channel do
)
when id == client_id do
# 1. Snapshot existing authorized resources
existing_resources = authorized_resources(socket)
old_authorized_resources =
Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids))
|> Map.values()
# 2. Update our state
socket = assign(socket, client: client)
# 3. If client's verification status changed, send diff of resources
if old_client.verified_at != client.verified_at do
resources = authorized_resources(socket)
socket =
if old_client.verified_at != client.verified_at do
new_authorized_resources = authorized_resources(socket)
added_resources = resources -- existing_resources
removed_resources = existing_resources -- resources
for resource <- new_authorized_resources -- old_authorized_resources do
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
end
for resource <- added_resources do
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
for resource <- old_authorized_resources -- new_authorized_resources do
push(socket, "resource_deleted", resource.id)
end
assign(socket,
authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id))
)
else
socket
end
for resource <- removed_resources do
push(socket, "resource_deleted", resource.id)
end
end
{:noreply, socket}
end
@@ -274,7 +334,9 @@ defmodule API.Client.Channel do
# 1. Check if this policy is for us
if Map.has_key?(socket.assigns.memberships, policy.actor_group_id) do
# 2. Snapshot existing resources
existing_resources = authorized_resources(socket)
old_authorized_resources =
Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids))
|> Map.values()
# 3. Hydrate a new resource if we aren't already tracking it
socket =
@@ -287,14 +349,19 @@ defmodule API.Client.Channel do
preload: :gateway_groups
)
assign(socket, resources: Map.put(socket.assigns.resources, resource.id, resource))
socket
|> assign(resources: Map.put(socket.assigns.resources, resource.id, resource))
|> assign(
authorized_resource_ids:
MapSet.put(socket.assigns.authorized_resource_ids, resource.id)
)
end
# 4. Hydrate the new policy
socket = assign(socket, policies: Map.put(socket.assigns.policies, policy.id, policy))
# 5. Maybe send new resource
if resource = (authorized_resources(socket) -- existing_resources) |> List.first() do
if resource = (authorized_resources(socket) -- old_authorized_resources) |> List.first() do
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
end
@@ -347,15 +414,24 @@ defmodule API.Client.Channel do
# 1. Check if this policy is for us
if Map.has_key?(socket.assigns.policies, policy.id) do
# 2. Snapshot existing resources
existing_resources = authorized_resources(socket)
old_authorized_resources =
Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids))
|> Map.values()
# 3. Update our state
socket = assign(socket, policies: Map.delete(socket.assigns.policies, policy.id))
r_ids = Enum.map(socket.assigns.policies, fn {_id, p} -> p.resource_id end) |> Enum.uniq()
socket = assign(socket, resources: Map.take(socket.assigns.resources, r_ids))
authorized_resources = authorized_resources(socket)
socket =
assign(socket,
authorized_resource_ids: MapSet.new(Enum.map(authorized_resources, & &1.id))
)
# 4. Push deleted resource to the client if we lost access to it
if resource = (existing_resources -- authorized_resources(socket)) |> List.first() do
if resource = (old_authorized_resources -- authorized_resources) |> List.first() do
push(socket, "resource_deleted", resource.id)
end
@@ -382,7 +458,7 @@ defmodule API.Client.Channel do
socket = assign(socket, resources: Map.put(socket.assigns.resources, resource_id, resource))
# 4. If resource is allowed, push
if resource in authorized_resources(socket) do
if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) do
# Connlib doesn't handle resources changing sites, so we need to delete then create
# See https://github.com/firezone/firezone/issues/9881
push(socket, "resource_deleted", resource.id)
@@ -414,7 +490,7 @@ defmodule API.Client.Channel do
push(socket, "resource_deleted", resource.id)
# 4. If resource is allowed, and has at least one site connected, push
if resource.id in Enum.map(authorized_resources(socket), & &1.id) and
if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) and
length(resource.gateway_groups) > 0 do
# Connlib doesn't handle resources changing sites, so we need to delete then create
# See https://github.com/firezone/firezone/issues/9881
@@ -453,7 +529,8 @@ defmodule API.Client.Channel do
old_resource.address_description != resource.address_description or
old_resource.name != resource.name
if resource.id in Enum.map(authorized_resources(socket), & &1.id) and resource_changed? do
if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) and
resource_changed? do
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
end

View File

@@ -75,7 +75,6 @@ defmodule Domain.Application do
end
end
# TODO: Configure Oban workers to only run on domain nodes
defp oban do
[{Oban, Application.fetch_env!(:domain, Oban)}]
end

View File

@@ -162,6 +162,12 @@ defmodule Domain.Flows do
end
end
def delete_expired_flows do
Flow.Query.all()
|> Flow.Query.expired()
|> Repo.delete_all()
end
def delete_flows_for(%Domain.Accounts.Account{} = account) do
Flow.Query.all()
|> Flow.Query.by_account_id(account.id)

View File

@@ -5,6 +5,11 @@ defmodule Domain.Flows.Flow.Query do
from(flows in Domain.Flows.Flow, as: :flows)
end
def expired(queryable) do
now = DateTime.utc_now()
where(queryable, [flows: flows], flows.expires_at <= ^now)
end
def not_expired(queryable) do
now = DateTime.utc_now()
where(queryable, [flows: flows], flows.expires_at > ^now)

View File

@@ -0,0 +1,19 @@
defmodule Domain.Flows.Jobs.DeleteExpiredFlows do
@moduledoc """
Job to delete expired flows.
"""
use Oban.Worker, queue: :default
require Logger
alias Domain.Flows
@impl Oban.Worker
def perform(_args) do
{count, nil} = Flows.delete_expired_flows()
Logger.info("Deleted #{count} expired flows")
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Repo.Migrations.IndexFlowsOnExpiresAt do
use Ecto.Migration
@disable_ddl_transaction true
def change do
drop_if_exists(index(:flows, [:account_id, :expires_at, :gateway_id], concurrently: true))
create_if_not_exists(
index(:flows, [:expires_at, :account_id, :gateway_id], concurrently: true)
)
end
end

View File

@@ -203,8 +203,25 @@ config :domain, outbound_email_adapter_configured?: false
config :domain, web_external_url: "http://localhost:13000"
config :domain, Oban,
engine: Oban.Engines.Basic,
plugins: [
# Keep the last 90 days of completed, cancelled, and discarded jobs
{Oban.Plugins.Pruner, max_age: 60 * 60 * 24 * 90},
# Rescue jobs that may have failed due to transient errors like deploys
# or network issues. It's not guaranteed that the job won't be executed
# twice, so for now we disable it since all of our Oban jobs can be retried
# without loss.
# {Oban.Plugins.Lifeline, rescue_after: :timer.minutes(30)}
# Periodic jobs
{Oban.Plugins.Cron,
crontab: [
# Delete expired flows every minute
{"* * * * *", Domain.Flows.Jobs.DeleteExpiredFlows}
]}
],
queues: [default: 10],
engine: Oban.Engines.Basic,
repo: Domain.Repo
###############################

View File

@@ -8,18 +8,6 @@ config :domain, Domain.Repo,
pool_size: 10,
show_sensitive_data_on_connection_error: false
config :domain, Oban,
plugins: [
# Keep the last 90 days of completed, cancelled, and discarded jobs
{Oban.Plugins.Pruner, max_age: 60 * 60 * 24 * 90}
# Rescue jobs that may have failed due to transient errors like deploys
# or network issues. It's not guaranteed that the job won't be executed
# twice, so for now we disable it since all of our Oban jobs can be retried
# without loss.
# {Oban.Plugins.Lifeline, rescue_after: :timer.minutes(30)}
]
###############################
##### Web #####################
###############################

View File

@@ -148,6 +148,14 @@ if config_env() == :prod do
env_var_to_config!(:background_jobs_enabled) and
Enum.member?(env_var_to_config!(:auth_provider_adapters), :mock)
config :domain, Oban,
queues:
if(env_var_to_config!(:background_jobs_enabled),
do: [default: 10],
# Using an empty queue prevents jobs from running on other nodes
else: []
)
if web_external_url = env_var_to_config!(:web_external_url) do
%{
scheme: web_external_url_scheme,