diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index 3002b7096..d95f30cd8 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -20,6 +20,11 @@ defmodule API.Client.Channel do require Logger require OpenTelemetry.Tracer + # For time-based policy conditions, we need to determine whether we still have access + # If not, we need to send resource_deleted so that if it's added back later, the client's + # connlib state will be cleaned up so it can request a new connection. + @recompute_authorized_resources_every :timer.minutes(1) + @gateway_compatibility [ # We introduced new websocket protocol and the clients of version 1.4+ # are only compatible with gateways of version 1.4+ @@ -63,6 +68,13 @@ defmodule API.Client.Channel do # Called immediately after the client joins the channel def handle_info(:after_join, socket) do + # Schedule reassessing allowed resources + Process.send_after( + self(), + :recompute_authorized_resources, + @recompute_authorized_resources_every + ) + # Initialize the cache. socket = socket @@ -86,8 +98,15 @@ defmodule API.Client.Channel do # Initialize resources resources = authorized_resources(socket) + # Save list of authorized resources in the socket to check against in + # the recompute_authorized_resources timer + socket = assign(socket, authorized_resource_ids: MapSet.new(Enum.map(resources, & &1.id))) + # Delete any stale flows for resources we may not have access to anymore - Flows.delete_stale_flows_on_connect(socket.assigns.client, resources) + Flows.delete_stale_flows_on_connect( + socket.assigns.client, + resources + ) push(socket, "init", %{ resources: Views.Resource.render_many(resources), @@ -107,6 +126,34 @@ defmodule API.Client.Channel do {:noreply, socket} end + # Needed to keep the client's resource list up to date for time-based policy conditions + def handle_info(:recompute_authorized_resources, socket) do + Process.send_after( + self(), + :recompute_authorized_resources, + @recompute_authorized_resources_every + ) + + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() + + new_authorized_resources = authorized_resources(socket) + + for resource <- old_authorized_resources -- new_authorized_resources do + push(socket, "resource_deleted", resource.id) + end + + for resource <- new_authorized_resources -- old_authorized_resources do + push(socket, "resource_created_or_updated", Views.Resource.render(resource)) + end + + {:noreply, + assign(socket, + authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id)) + )} + end + # Called to actually push relays_presence with a disconnected relay to the client def handle_info({:push_leave, relay_id, stamp_secret, payload}, socket) do {:noreply, Debouncer.handle_leave(socket, relay_id, stamp_secret, payload, &push/3)} @@ -139,8 +186,10 @@ defmodule API.Client.Channel do %{assigns: %{client: %{actor_id: id}}} = socket ) when id == actor_id do - # 1. Get existing resources - existing_resources = authorized_resources(socket) + # 1. Get existing authorized resources + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() # 2. Re-hydrate our policies and resources # It's not ideal we're hitting the DB here, but in practice it shouldn't be an issue because @@ -152,14 +201,19 @@ defmodule API.Client.Channel do memberships = Map.put(socket.assigns.memberships, group_id, membership) socket = assign(socket, memberships: memberships) - # 3. Get new resources - new_resources = authorized_resources(socket) -- existing_resources + # 3. Compute new authorized resources + new_authorized_resources = authorized_resources(socket) # 4. Push new resources to the client - for resource <- new_resources do + for resource <- new_authorized_resources -- old_authorized_resources do push(socket, "resource_created_or_updated", Views.Resource.render(resource)) end + socket = + assign(socket, + authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id)) + ) + {:noreply, socket} end @@ -209,27 +263,33 @@ defmodule API.Client.Channel do ) when id == client_id do # 1. Snapshot existing authorized resources - existing_resources = authorized_resources(socket) + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() # 2. Update our state socket = assign(socket, client: client) # 3. If client's verification status changed, send diff of resources - if old_client.verified_at != client.verified_at do - resources = authorized_resources(socket) + socket = + if old_client.verified_at != client.verified_at do + new_authorized_resources = authorized_resources(socket) - added_resources = resources -- existing_resources - removed_resources = existing_resources -- resources + for resource <- new_authorized_resources -- old_authorized_resources do + push(socket, "resource_created_or_updated", Views.Resource.render(resource)) + end - for resource <- added_resources do - push(socket, "resource_created_or_updated", Views.Resource.render(resource)) + for resource <- old_authorized_resources -- new_authorized_resources do + push(socket, "resource_deleted", resource.id) + end + + assign(socket, + authorized_resource_ids: MapSet.new(Enum.map(new_authorized_resources, & &1.id)) + ) + else + socket end - for resource <- removed_resources do - push(socket, "resource_deleted", resource.id) - end - end - {:noreply, socket} end @@ -274,7 +334,9 @@ defmodule API.Client.Channel do # 1. Check if this policy is for us if Map.has_key?(socket.assigns.memberships, policy.actor_group_id) do # 2. Snapshot existing resources - existing_resources = authorized_resources(socket) + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() # 3. Hydrate a new resource if we aren't already tracking it socket = @@ -287,14 +349,19 @@ defmodule API.Client.Channel do preload: :gateway_groups ) - assign(socket, resources: Map.put(socket.assigns.resources, resource.id, resource)) + socket + |> assign(resources: Map.put(socket.assigns.resources, resource.id, resource)) + |> assign( + authorized_resource_ids: + MapSet.put(socket.assigns.authorized_resource_ids, resource.id) + ) end # 4. Hydrate the new policy socket = assign(socket, policies: Map.put(socket.assigns.policies, policy.id, policy)) # 5. Maybe send new resource - if resource = (authorized_resources(socket) -- existing_resources) |> List.first() do + if resource = (authorized_resources(socket) -- old_authorized_resources) |> List.first() do push(socket, "resource_created_or_updated", Views.Resource.render(resource)) end @@ -347,15 +414,24 @@ defmodule API.Client.Channel do # 1. Check if this policy is for us if Map.has_key?(socket.assigns.policies, policy.id) do # 2. Snapshot existing resources - existing_resources = authorized_resources(socket) + old_authorized_resources = + Map.take(socket.assigns.resources, MapSet.to_list(socket.assigns.authorized_resource_ids)) + |> Map.values() # 3. Update our state socket = assign(socket, policies: Map.delete(socket.assigns.policies, policy.id)) r_ids = Enum.map(socket.assigns.policies, fn {_id, p} -> p.resource_id end) |> Enum.uniq() socket = assign(socket, resources: Map.take(socket.assigns.resources, r_ids)) + authorized_resources = authorized_resources(socket) + + socket = + assign(socket, + authorized_resource_ids: MapSet.new(Enum.map(authorized_resources, & &1.id)) + ) + # 4. Push deleted resource to the client if we lost access to it - if resource = (existing_resources -- authorized_resources(socket)) |> List.first() do + if resource = (old_authorized_resources -- authorized_resources) |> List.first() do push(socket, "resource_deleted", resource.id) end @@ -382,7 +458,7 @@ defmodule API.Client.Channel do socket = assign(socket, resources: Map.put(socket.assigns.resources, resource_id, resource)) # 4. If resource is allowed, push - if resource in authorized_resources(socket) do + if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) do # Connlib doesn't handle resources changing sites, so we need to delete then create # See https://github.com/firezone/firezone/issues/9881 push(socket, "resource_deleted", resource.id) @@ -414,7 +490,7 @@ defmodule API.Client.Channel do push(socket, "resource_deleted", resource.id) # 4. If resource is allowed, and has at least one site connected, push - if resource.id in Enum.map(authorized_resources(socket), & &1.id) and + if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) and length(resource.gateway_groups) > 0 do # Connlib doesn't handle resources changing sites, so we need to delete then create # See https://github.com/firezone/firezone/issues/9881 @@ -453,7 +529,8 @@ defmodule API.Client.Channel do old_resource.address_description != resource.address_description or old_resource.name != resource.name - if resource.id in Enum.map(authorized_resources(socket), & &1.id) and resource_changed? do + if MapSet.member?(socket.assigns.authorized_resource_ids, resource.id) and + resource_changed? do push(socket, "resource_created_or_updated", Views.Resource.render(resource)) end diff --git a/elixir/apps/domain/lib/domain/application.ex b/elixir/apps/domain/lib/domain/application.ex index 85ccd90a0..d20a71a13 100644 --- a/elixir/apps/domain/lib/domain/application.ex +++ b/elixir/apps/domain/lib/domain/application.ex @@ -75,7 +75,6 @@ defmodule Domain.Application do end end - # TODO: Configure Oban workers to only run on domain nodes defp oban do [{Oban, Application.fetch_env!(:domain, Oban)}] end diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index 1c0cb5837..3e240e791 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -162,6 +162,12 @@ defmodule Domain.Flows do end end + def delete_expired_flows do + Flow.Query.all() + |> Flow.Query.expired() + |> Repo.delete_all() + end + def delete_flows_for(%Domain.Accounts.Account{} = account) do Flow.Query.all() |> Flow.Query.by_account_id(account.id) diff --git a/elixir/apps/domain/lib/domain/flows/flow/query.ex b/elixir/apps/domain/lib/domain/flows/flow/query.ex index ef10e103f..3c55ce732 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/query.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/query.ex @@ -5,6 +5,11 @@ defmodule Domain.Flows.Flow.Query do from(flows in Domain.Flows.Flow, as: :flows) end + def expired(queryable) do + now = DateTime.utc_now() + where(queryable, [flows: flows], flows.expires_at <= ^now) + end + def not_expired(queryable) do now = DateTime.utc_now() where(queryable, [flows: flows], flows.expires_at > ^now) diff --git a/elixir/apps/domain/lib/domain/flows/jobs/delete_expired_flows.ex b/elixir/apps/domain/lib/domain/flows/jobs/delete_expired_flows.ex new file mode 100644 index 000000000..9d7b14aca --- /dev/null +++ b/elixir/apps/domain/lib/domain/flows/jobs/delete_expired_flows.ex @@ -0,0 +1,19 @@ +defmodule Domain.Flows.Jobs.DeleteExpiredFlows do + @moduledoc """ + Job to delete expired flows. + """ + use Oban.Worker, queue: :default + + require Logger + + alias Domain.Flows + + @impl Oban.Worker + def perform(_args) do + {count, nil} = Flows.delete_expired_flows() + + Logger.info("Deleted #{count} expired flows") + + :ok + end +end diff --git a/elixir/apps/domain/priv/repo/manual_migrations/20250725031509_reindex_flows_on_expires_at.exs b/elixir/apps/domain/priv/repo/manual_migrations/20250725031509_reindex_flows_on_expires_at.exs new file mode 100644 index 000000000..c4deb258d --- /dev/null +++ b/elixir/apps/domain/priv/repo/manual_migrations/20250725031509_reindex_flows_on_expires_at.exs @@ -0,0 +1,13 @@ +defmodule Domain.Repo.Migrations.IndexFlowsOnExpiresAt do + use Ecto.Migration + + @disable_ddl_transaction true + + def change do + drop_if_exists(index(:flows, [:account_id, :expires_at, :gateway_id], concurrently: true)) + + create_if_not_exists( + index(:flows, [:expires_at, :account_id, :gateway_id], concurrently: true) + ) + end +end diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 25bb8b8c0..1d03052b2 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -203,8 +203,25 @@ config :domain, outbound_email_adapter_configured?: false config :domain, web_external_url: "http://localhost:13000" config :domain, Oban, - engine: Oban.Engines.Basic, + plugins: [ + # Keep the last 90 days of completed, cancelled, and discarded jobs + {Oban.Plugins.Pruner, max_age: 60 * 60 * 24 * 90}, + + # Rescue jobs that may have failed due to transient errors like deploys + # or network issues. It's not guaranteed that the job won't be executed + # twice, so for now we disable it since all of our Oban jobs can be retried + # without loss. + # {Oban.Plugins.Lifeline, rescue_after: :timer.minutes(30)} + + # Periodic jobs + {Oban.Plugins.Cron, + crontab: [ + # Delete expired flows every minute + {"* * * * *", Domain.Flows.Jobs.DeleteExpiredFlows} + ]} + ], queues: [default: 10], + engine: Oban.Engines.Basic, repo: Domain.Repo ############################### diff --git a/elixir/config/prod.exs b/elixir/config/prod.exs index 0a057f3ee..5668c0f1b 100644 --- a/elixir/config/prod.exs +++ b/elixir/config/prod.exs @@ -8,18 +8,6 @@ config :domain, Domain.Repo, pool_size: 10, show_sensitive_data_on_connection_error: false -config :domain, Oban, - plugins: [ - # Keep the last 90 days of completed, cancelled, and discarded jobs - {Oban.Plugins.Pruner, max_age: 60 * 60 * 24 * 90} - - # Rescue jobs that may have failed due to transient errors like deploys - # or network issues. It's not guaranteed that the job won't be executed - # twice, so for now we disable it since all of our Oban jobs can be retried - # without loss. - # {Oban.Plugins.Lifeline, rescue_after: :timer.minutes(30)} - ] - ############################### ##### Web ##################### ############################### diff --git a/elixir/config/runtime.exs b/elixir/config/runtime.exs index b1f6028ea..7a0489f8f 100644 --- a/elixir/config/runtime.exs +++ b/elixir/config/runtime.exs @@ -148,6 +148,14 @@ if config_env() == :prod do env_var_to_config!(:background_jobs_enabled) and Enum.member?(env_var_to_config!(:auth_provider_adapters), :mock) + config :domain, Oban, + queues: + if(env_var_to_config!(:background_jobs_enabled), + do: [default: 10], + # Using an empty queue prevents jobs from running on other nodes + else: [] + ) + if web_external_url = env_var_to_config!(:web_external_url) do %{ scheme: web_external_url_scheme,