From f1a5af356d196e30c4fd56ff23ec62f3a785eea7 Mon Sep 17 00:00:00 2001 From: Jamil Date: Fri, 25 Jul 2025 17:04:41 -0400 Subject: [PATCH] fix(portal): groom resource list and flows periodically (#10005) Time-based policy conditions are tricky. When they authorize a flow, we correctly tell the Gateway to remove access when the time window expires. However, we do nothing on the client to reset the connectivity state. This means that whenever the window of time of access was re-entered, the client would essentially never be able to connect to it again until the resource was toggled. To fix this, we add a 1-minute check in the client channel that re-checks allowed resources, and updates the client state with the difference. This means that policies that have time-based conditions are only accurate to the minute, but this is how they're presented anyhow. For good measure, we also add a periodic job that runs every minute to delete expired Flows. This will propagate to the Gateway where, if the access for a particular client-resource is determined to be actually gone, will receive `reject_access`. Zooming out a bit, this PR furthers the theme that: - Client channels react to underlying resource / policy / membership changes directly, while - Gateway channels react primarily to flows being deleted, or the downstream effects of a prior client authorization --- elixir/apps/api/lib/api/client/channel.ex | 129 ++++++++++++++---- elixir/apps/domain/lib/domain/application.ex | 1 - elixir/apps/domain/lib/domain/flows.ex | 6 + .../domain/lib/domain/flows/flow/query.ex | 5 + .../domain/flows/jobs/delete_expired_flows.ex | 19 +++ ...0725031509_reindex_flows_on_expires_at.exs | 13 ++ elixir/config/config.exs | 19 ++- elixir/config/prod.exs | 12 -- elixir/config/runtime.exs | 8 ++ 9 files changed, 172 insertions(+), 40 deletions(-) create mode 100644 elixir/apps/domain/lib/domain/flows/jobs/delete_expired_flows.ex create mode 100644 elixir/apps/domain/priv/repo/manual_migrations/20250725031509_reindex_flows_on_expires_at.exs diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index 3002b7096..d95f30cd8 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -20,6 +20,11 @@ defmodule API.Client.Channel do require Logger require OpenTelemetry.Tracer + # For time-based policy conditions, we need to determine whether we still have access + # If not, we need to send resource_deleted so that if it's added back later, the client's + # connlib state will be cleaned up so it can request a new connection. + @recompute_authorized_resources_every :timer.minutes(1) + @gateway_compatibility [ # We introduced new websocket protocol and the clients of version 1.4+ # are only compatible with gateways of version 1.4+ @@ -63,6 +68,13 @@ defmodule API.Client.Channel do # Called immediately after the client joins the channel def handle_info(:after_join, socket) do + # Schedule reassessing allowed resources + Process.send_after( + self(), + :recompute_authorized_resources, + @recompute_authorized_resources_every + ) + # Initialize the cache. socket = socket @@ -86,8 +98,15 @@ defmodule API.Client.Channel do # Initialize resources resources = authorized_resources(socket) + # Save list of authorized resources in the socket to check against in + # the recompute_authorized_resources timer + socket = assign(socket, authorized_resource_ids: MapSet.new(Enum.map(resources, & &1.id))) + # Delete any stale flows for resources we may not have access to anymore - Flows.delete_stale_flows_on_connect(socket.assigns.client, resources) + Flows.delete_stale_flows_on_connect( + socket.assigns.client, + resources + ) push(socket, "init", %{ resources: Views.Resource.render_many(resources), @@ -107,6 +126,34 @@ defmodule API.Client.Channel do {:noreply, socket} end + # Needed to keep the client's resource list up to date for time-based policy conditions + def handle_info(:recompute_authorized_resources, socket) do + Process.send_after( + self(), + :recompute_authorized_resources, + @recompute_authorized_resources_every + ) + + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() + + new_authorized_resources = authorized_resources(socket) + + for resource <- old_authorized_resources -- new_authorized_resources do + push(socket, "resource_deleted", resource.id) + end + + for resource <- new_authorized_resources -- old_authorized_resources do + push(socket, "resource_created_or_updated", Views.Resource.render(resource)) + end + + {:noreply, + assign(socket, + authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id)) + )} + end + # Called to actually push relays_presence with a disconnected relay to the client def handle_info({:push_leave, relay_id, stamp_secret, payload}, socket) do {:noreply, Debouncer.handle_leave(socket, relay_id, stamp_secret, payload, &push/3)} @@ -139,8 +186,10 @@ defmodule API.Client.Channel do %{assigns: %{client: %{actor_id: id}}} = socket ) when id == actor_id do - # 1. Get existing resources - existing_resources = authorized_resources(socket) + # 1. Get existing authorized resources + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() # 2. Re-hydrate our policies and resources # It's not ideal we're hitting the DB here, but in practice it shouldn't be an issue because @@ -152,14 +201,19 @@ defmodule API.Client.Channel do memberships = Map.put(socket.assigns.memberships, group_id, membership) socket = assign(socket, memberships: memberships) - # 3. Get new resources - new_resources = authorized_resources(socket) -- existing_resources + # 3. Compute new authorized resources + new_authorized_resources = authorized_resources(socket) # 4. Push new resources to the client - for resource <- new_resources do + for resource <- new_authorized_resources -- old_authorized_resources do push(socket, "resource_created_or_updated", Views.Resource.render(resource)) end + socket = + assign(socket, + authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id)) + ) + {:noreply, socket} end @@ -209,27 +263,33 @@ defmodule API.Client.Channel do ) when id == client_id do # 1. Snapshot existing authorized resources - existing_resources = authorized_resources(socket) + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() # 2. Update our state socket = assign(socket, client: client) # 3. If client's verification status changed, send diff of resources - if old_client.verified_at != client.verified_at do - resources = authorized_resources(socket) + socket = + if old_client.verified_at != client.verified_at do + new_authorized_resources = authorized_resources(socket) - added_resources = resources -- existing_resources - removed_resources = existing_resources -- resources + for resource <- new_authorized_resources -- old_authorized_resources do + push(socket, "resource_created_or_updated", Views.Resource.render(resource)) + end - for resource <- added_resources do - push(socket, "resource_created_or_updated", Views.Resource.render(resource)) + for resource <- old_authorized_resources -- new_authorized_resources do + push(socket, "resource_deleted", resource.id) + end + + assign(socket, + authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id)) + ) + else + socket end - for resource <- removed_resources do - push(socket, "resource_deleted", resource.id) - end - end - {:noreply, socket} end @@ -274,7 +334,9 @@ defmodule API.Client.Channel do # 1. Check if this policy is for us if Map.has_key?(socket.assigns.memberships, policy.actor_group_id) do # 2. Snapshot existing resources - existing_resources = authorized_resources(socket) + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() # 3. Hydrate a new resource if we aren't already tracking it socket = @@ -287,14 +349,19 @@ defmodule API.Client.Channel do preload: :gateway_groups ) - assign(socket, resources: Map.put(socket.assigns.resources, resource.id, resource)) + socket + |> assign(resources: Map.put(socket.assigns.resources, resource.id, resource)) + |> assign( + authorized_resource_ids: + MapSet.put(socket.assigns.authorized_resource_ids, resource.id) + ) end # 4. Hydrate the new policy socket = assign(socket, policies: Map.put(socket.assigns.policies, policy.id, policy)) # 5. Maybe send new resource - if resource = (authorized_resources(socket) -- existing_resources) |> List.first() do + if resource = (authorized_resources(socket) -- old_authorized_resources) |> List.first() do push(socket, "resource_created_or_updated", Views.Resource.render(resource)) end @@ -347,15 +414,24 @@ defmodule API.Client.Channel do # 1. Check if this policy is for us if Map.has_key?(socket.assigns.policies, policy.id) do # 2. Snapshot existing resources - existing_resources = authorized_resources(socket) + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() # 3. Update our state socket = assign(socket, policies: Map.delete(socket.assigns.policies, policy.id)) r_ids = Enum.map(socket.assigns.policies, fn {_id, p} -> p.resource_id end) |> Enum.uniq() socket = assign(socket, resources: Map.take(socket.assigns.resources, r_ids)) + authorized_resources = authorized_resources(socket) + + socket = + assign(socket, + authorized_resource_ids: MapSet.new(Enum.map(authorized_resources, & &1.id)) + ) + # 4. Push deleted resource to the client if we lost access to it - if resource = (existing_resources -- authorized_resources(socket)) |> List.first() do + if resource = (old_authorized_resources -- authorized_resources) |> List.first() do push(socket, "resource_deleted", resource.id) end @@ -382,7 +458,7 @@ defmodule API.Client.Channel do socket = assign(socket, resources: Map.put(socket.assigns.resources, resource_id, resource)) # 4. If resource is allowed, push - if resource in authorized_resources(socket) do + if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) do # Connlib doesn't handle resources changing sites, so we need to delete then create # See https://github.com/firezone/firezone/issues/9881 push(socket, "resource_deleted", resource.id) @@ -414,7 +490,7 @@ defmodule API.Client.Channel do push(socket, "resource_deleted", resource.id) # 4. If resource is allowed, and has at least one site connected, push - if resource.id in Enum.map(authorized_resources(socket), & &1.id) and + if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) and length(resource.gateway_groups) > 0 do # Connlib doesn't handle resources changing sites, so we need to delete then create # See https://github.com/firezone/firezone/issues/9881 @@ -453,7 +529,8 @@ defmodule API.Client.Channel do old_resource.address_description != resource.address_description or old_resource.name != resource.name - if resource.id in Enum.map(authorized_resources(socket), & &1.id) and resource_changed? do + if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) and + resource_changed? do push(socket, "resource_created_or_updated", Views.Resource.render(resource)) end diff --git a/elixir/apps/domain/lib/domain/application.ex b/elixir/apps/domain/lib/domain/application.ex index 85ccd90a0..d20a71a13 100644 --- a/elixir/apps/domain/lib/domain/application.ex +++ b/elixir/apps/domain/lib/domain/application.ex @@ -75,7 +75,6 @@ defmodule Domain.Application do end end - # TODO: Configure Oban workers to only run on domain nodes defp oban do [{Oban, Application.fetch_env!(:domain, Oban)}] end diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index 1c0cb5837..3e240e791 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -162,6 +162,12 @@ defmodule Domain.Flows do end end + def delete_expired_flows do + Flow.Query.all() + |> Flow.Query.expired() + |> Repo.delete_all() + end + def delete_flows_for(%Domain.Accounts.Account{} = account) do Flow.Query.all() |> Flow.Query.by_account_id(account.id) diff --git a/elixir/apps/domain/lib/domain/flows/flow/query.ex b/elixir/apps/domain/lib/domain/flows/flow/query.ex index ef10e103f..3c55ce732 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/query.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/query.ex @@ -5,6 +5,11 @@ defmodule Domain.Flows.Flow.Query do from(flows in Domain.Flows.Flow, as: :flows) end + def expired(queryable) do + now = DateTime.utc_now() + where(queryable, [flows: flows], flows.expires_at <= ^now) + end + def not_expired(queryable) do now = DateTime.utc_now() where(queryable, [flows: flows], flows.expires_at > ^now) diff --git a/elixir/apps/domain/lib/domain/flows/jobs/delete_expired_flows.ex b/elixir/apps/domain/lib/domain/flows/jobs/delete_expired_flows.ex new file mode 100644 index 000000000..9d7b14aca --- /dev/null +++ b/elixir/apps/domain/lib/domain/flows/jobs/delete_expired_flows.ex @@ -0,0 +1,19 @@ +defmodule Domain.Flows.Jobs.DeleteExpiredFlows do + @moduledoc """ + Job to delete expired flows. + """ + use Oban.Worker, queue: :default + + require Logger + + alias Domain.Flows + + @impl Oban.Worker + def perform(_args) do + {count, nil} = Flows.delete_expired_flows() + + Logger.info("Deleted #{count} expired flows") + + :ok + end +end diff --git a/elixir/apps/domain/priv/repo/manual_migrations/20250725031509_reindex_flows_on_expires_at.exs b/elixir/apps/domain/priv/repo/manual_migrations/20250725031509_reindex_flows_on_expires_at.exs new file mode 100644 index 000000000..c4deb258d --- /dev/null +++ b/elixir/apps/domain/priv/repo/manual_migrations/20250725031509_reindex_flows_on_expires_at.exs @@ -0,0 +1,13 @@ +defmodule Domain.Repo.Migrations.IndexFlowsOnExpiresAt do + use Ecto.Migration + + @disable_ddl_transaction true + + def change do + drop_if_exists(index(:flows, [:account_id, :expires_at, :gateway_id], concurrently: true)) + + create_if_not_exists( + index(:flows, [:expires_at, :account_id, :gateway_id], concurrently: true) + ) + end +end diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 25bb8b8c0..1d03052b2 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -203,8 +203,25 @@ config :domain, outbound_email_adapter_configured?: false config :domain, web_external_url: "http://localhost:13000" config :domain, Oban, - engine: Oban.Engines.Basic, + plugins: [ + # Keep the last 90 days of completed, cancelled, and discarded jobs + {Oban.Plugins.Pruner, max_age: 60 * 60 * 24 * 90}, + + # Rescue jobs that may have failed due to transient errors like deploys + # or network issues. It's not guaranteed that the job won't be executed + # twice, so for now we disable it since all of our Oban jobs can be retried + # without loss. + # {Oban.Plugins.Lifeline, rescue_after: :timer.minutes(30)} + + # Periodic jobs + {Oban.Plugins.Cron, + crontab: [ + # Delete expired flows every minute + {"* * * * *", Domain.Flows.Jobs.DeleteExpiredFlows} + ]} + ], queues: [default: 10], + engine: Oban.Engines.Basic, repo: Domain.Repo ############################### diff --git a/elixir/config/prod.exs b/elixir/config/prod.exs index 0a057f3ee..5668c0f1b 100644 --- a/elixir/config/prod.exs +++ b/elixir/config/prod.exs @@ -8,18 +8,6 @@ config :domain, Domain.Repo, pool_size: 10, show_sensitive_data_on_connection_error: false -config :domain, Oban, - plugins: [ - # Keep the last 90 days of completed, cancelled, and discarded jobs - {Oban.Plugins.Pruner, max_age: 60 * 60 * 24 * 90} - - # Rescue jobs that may have failed due to transient errors like deploys - # or network issues. It's not guaranteed that the job won't be executed - # twice, so for now we disable it since all of our Oban jobs can be retried - # without loss. - # {Oban.Plugins.Lifeline, rescue_after: :timer.minutes(30)} - ] - ############################### ##### Web ##################### ############################### diff --git a/elixir/config/runtime.exs b/elixir/config/runtime.exs index b1f6028ea..7a0489f8f 100644 --- a/elixir/config/runtime.exs +++ b/elixir/config/runtime.exs @@ -148,6 +148,14 @@ if config_env() == :prod do env_var_to_config!(:background_jobs_enabled) and Enum.member?(env_var_to_config!(:auth_provider_adapters), :mock) + config :domain, Oban, + queues: + if(env_var_to_config!(:background_jobs_enabled), + do: [default: 10], + # Using an empty queue prevents jobs from running on other nodes + else: [] + ) + if web_external_url = env_var_to_config!(:web_external_url) do %{ scheme: web_external_url_scheme,