diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index ee9264c83..0be0b7e5a 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -63,14 +63,11 @@ defmodule API.Client.Channel do # Called immediately after the client joins the channel def handle_info(:after_join, socket) do - # Initialize the cache. Flows contains active flows for this client, so we - # can toggle the affected resource if the active flow is deleted. That will allow - # the client to create a new flow if the resource is still authorized. + # Initialize the cache. socket = socket |> hydrate_policies_and_resources() - |> hydrate_membership_group_ids() - |> assign(flows: MapSet.new()) + |> hydrate_memberships() # Initialize relays {:ok, relays} = select_relays(socket) @@ -138,7 +135,7 @@ defmodule API.Client.Channel do # ACTOR_GROUP_MEMBERSHIPS def handle_info( - {:created, %Actors.Membership{actor_id: actor_id, group_id: group_id}}, + {:created, %Actors.Membership{actor_id: actor_id, group_id: group_id} = membership}, %{assigns: %{client: %{actor_id: id}}} = socket ) when id == actor_id do @@ -152,8 +149,8 @@ defmodule API.Client.Channel do socket = hydrate_policies_and_resources(socket) # 3. Update our membership group IDs - ids = MapSet.put(socket.assigns.membership_group_ids, group_id) - socket = assign(socket, membership_group_ids: ids) + memberships = Map.put(socket.assigns.memberships, group_id, membership) + socket = assign(socket, memberships: memberships) # 3. Get new resources new_resources = authorized_resources(socket) -- existing_resources @@ -192,13 +189,13 @@ defmodule API.Client.Channel do r_ids = Enum.map(policies, fn {_id, policy} -> policy.resource_id end) |> Enum.uniq() resources = Map.take(socket.assigns.resources, r_ids) - membership_group_ids = Map.delete(socket.assigns.membership_group_ids, group_id) + memberships = Map.delete(socket.assigns.memberships, group_id) socket = socket |> assign(policies: policies) |> assign(resources: resources) - |> assign(membership_group_ids: membership_group_ids) + |> assign(memberships: memberships) {:noreply, socket} end @@ -244,38 +241,6 @@ defmodule API.Client.Channel do disconnect(socket) end - # FLOWS - - def handle_info( - {:deleted, %Flows.Flow{client_id: client_id} = flow}, - %{assigns: %{client: %{id: id}}} = socket - ) - when client_id == id do - if MapSet.member?(socket.assigns.flows, flow.id) do - # If an active flow is deleted, we need to recreate it. - # To do that, we need to flap the resource on the client because it doesn't track flows. - # The gateway is also tracking flows and will have sent a reject_access for this client/resource - # if this was the last flow in its cache that was authorizing it. - push(socket, "resource_deleted", flow.resource_id) - - resource = Map.get(socket.assigns.resources, flow.resource_id) - - # Access to resource is still allowed, allow creating a new flow - if not is_nil(resource) and resource.id in Enum.map(authorized_resources(socket), & &1.id) do - push(socket, "resource_created_or_updated", Views.Resource.render(resource)) - else - Logger.warning("Active flow deleted for resource but resource not found in socket state", - resource_id: flow.resource_id, - flow_id: flow.id - ) - end - - {:noreply, assign(socket, flows: MapSet.delete(socket.assigns.flows, flow.id))} - else - {:noreply, socket} - end - end - # GATEWAY_GROUPS def handle_info( @@ -307,7 +272,7 @@ defmodule API.Client.Channel do def handle_info({:created, %Policies.Policy{} = policy}, socket) do # 1. Check if this policy is for us - if MapSet.member?(socket.assigns.membership_group_ids, policy.actor_group_id) do + if Map.has_key?(socket.assigns.memberships, policy.actor_group_id) do # 2. Snapshot existing resources existing_resources = authorized_resources(socket) @@ -731,7 +696,9 @@ defmodule API.Client.Channel do gateway, resource_id, policy, - socket.assigns.subject + Map.fetch!(socket.assigns.memberships, policy.actor_group_id).id, + socket.assigns.subject, + expires_at ) preshared_key = generate_preshared_key() @@ -751,8 +718,6 @@ defmodule API.Client.Channel do }} ) - socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) - {:noreply, socket} else {:error, :not_found} -> @@ -886,7 +851,9 @@ defmodule API.Client.Channel do gateway, resource_id, policy, - socket.assigns.subject + Map.fetch!(socket.assigns.memberships, policy.actor_group_id).id, + socket.assigns.subject, + expires_at ) :ok = @@ -902,8 +869,6 @@ defmodule API.Client.Channel do }} ) - socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) - {:noreply, socket} else {:error, :not_found} -> @@ -946,7 +911,9 @@ defmodule API.Client.Channel do gateway, resource_id, policy, - socket.assigns.subject + Map.fetch!(socket.assigns.memberships, policy.actor_group_id).id, + socket.assigns.subject, + expires_at ) :ok = @@ -963,8 +930,6 @@ defmodule API.Client.Channel do }} ) - socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) - {:noreply, socket} else {:error, :not_found} -> @@ -1206,16 +1171,19 @@ defmodule API.Client.Channel do end end - defp hydrate_membership_group_ids(socket) do - OpenTelemetry.Tracer.with_span "client.hydrate_membership_group_ids", + defp hydrate_memberships(socket) do + OpenTelemetry.Tracer.with_span "client.hydrate_memberships", attributes: %{ account_id: socket.assigns.client.account_id } do - membership_group_ids = - Actors.all_actor_group_ids!(socket.assigns.subject.actor) - |> MapSet.new() + memberships = + Actors.all_memberships_for_actor!(socket.assigns.subject.actor) + |> Enum.map(fn membership -> + {membership.group_id, membership} + end) + |> Enum.into(%{}) - assign(socket, membership_group_ids: membership_group_ids) + assign(socket, memberships: memberships) end end @@ -1240,7 +1208,7 @@ defmodule API.Client.Channel do end end - # Returns either the authorized resource or an error tuple of violated properties + # Returns either the authorized policy or an error tuple of violated properties defp authorize_resource(socket, resource_id) do OpenTelemetry.Tracer.with_span "client.authorize_resource", attributes: %{ @@ -1264,7 +1232,11 @@ defmodule API.Client.Channel do {:ok, policy, expires_at} -> # Set a maximum expiration time for the authorization - {:ok, policy, expires_at || socket.assigns.subject.expires_at} + expires_at = + expires_at || socket.assigns.subject.expires_at || + DateTime.utc_now() |> DateTime.add(14, :day) + + {:ok, policy, expires_at} end end end diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index f6d2eac4a..f1530956e 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -111,19 +111,31 @@ defmodule API.Gateway.Channel do tuple = {flow.client_id, flow.resource_id} socket = - if flow_map = Map.get(socket.assigns.flows, tuple) do - flow_map = Map.delete(flow_map, flow.id) + if flows_map = Map.get(socket.assigns.flows, tuple) do + flow_map = Map.delete(flows_map, flow.id) - if map_size(flow_map) == 0 do - # Send reject_access if this was the last flow granting access for this client/resource - push(socket, "reject_access", %{ - client_id: flow.client_id, - resource_id: flow.resource_id - }) + with 0 <- map_size(flow_map), + {:ok, new_flow} <- Flows.reauthorize_flow(flow) do + flow_map = %{ + new_flow.id => new_flow.expires_at + } + + push( + socket, + "access_authorization_expiry_updated", + Views.Flow.render(new_flow, new_flow.expires_at) + ) - assign(socket, flows: Map.delete(socket.assigns.flows, tuple)) - else assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map)) + else + _ -> + # Send reject_access if access is no longer granted + push(socket, "reject_access", %{ + client_id: flow.client_id, + resource_id: flow.resource_id + }) + + assign(socket, flows: Map.delete(socket.assigns.flows, tuple)) end else socket @@ -144,7 +156,7 @@ defmodule API.Gateway.Channel do when old_filters != filters do has_flows? = socket.assigns.flows - |> Enum.any?(fn {{_client_id, resource_id}, _expires_at} -> resource_id == id end) + |> Enum.any?(fn {{_client_id, resource_id}, _flow_map} -> resource_id == id end) if has_flows? do push(socket, "resource_updated", Views.Resource.render(resource)) @@ -214,49 +226,48 @@ defmodule API.Gateway.Channel do payload: %{joins: joins} }, socket - ) do - if Enum.count(joins) > 0 do - {:ok, relays} = select_relays(socket) + ) + when map_size(joins) > 0 do + {:ok, relays} = select_relays(socket) - if length(relays) > 0 do - relay_credentials_expire_at = DateTime.utc_now() |> DateTime.add(90, :day) + if length(relays) > 0 do + relay_credentials_expire_at = DateTime.utc_now() |> DateTime.add(90, :day) - :ok = - Relays.unsubscribe_from_relays_presence_in_account(socket.assigns.gateway.account_id) + :ok = + Relays.unsubscribe_from_relays_presence_in_account(socket.assigns.gateway.account_id) - :ok = - Enum.each(relays, fn relay -> - # TODO: WAL - # Why are we unsubscribing and subscribing again? - :ok = Relays.unsubscribe_from_relay_presence(relay) - :ok = Relays.subscribe_to_relay_presence(relay) - end) + :ok = + Enum.each(relays, fn relay -> + # TODO: WAL + # Why are we unsubscribing and subscribing again? + :ok = Relays.unsubscribe_from_relay_presence(relay) + :ok = Relays.subscribe_to_relay_presence(relay) + end) - # Cache new stamp secrets - socket = Debouncer.cache_stamp_secrets(socket, relays) + # Cache new stamp secrets + socket = Debouncer.cache_stamp_secrets(socket, relays) - # If a relay reconnects with a different stamp_secret, disconnect them immediately - joined_ids = Map.keys(joins) + # If a relay reconnects with a different stamp_secret, disconnect them immediately + joined_ids = Map.keys(joins) - {socket, disconnected_ids} = - Debouncer.cancel_leaves_or_disconnect_immediately( - socket, - joined_ids, - socket.assigns.gateway.account_id + {socket, disconnected_ids} = + Debouncer.cancel_leaves_or_disconnect_immediately( + socket, + joined_ids, + socket.assigns.gateway.account_id + ) + + push(socket, "relays_presence", %{ + disconnected_ids: disconnected_ids, + connected: + Views.Relay.render_many( + relays, + socket.assigns.gateway.public_key, + relay_credentials_expire_at ) + }) - push(socket, "relays_presence", %{ - disconnected_ids: disconnected_ids, - connected: - Views.Relay.render_many( - relays, - socket.assigns.gateway.public_key, - relay_credentials_expire_at - ) - }) - - {:noreply, socket} - end + {:noreply, socket} else {:noreply, socket} end @@ -321,11 +332,7 @@ defmodule API.Gateway.Channel do gateway_ice_credentials: ice_credentials.gateway, client: Views.Client.render(client, preshared_key), client_ice_credentials: ice_credentials.client, - # Gateway manages its own expiration - expires_at: - if(authorization_expires_at, - do: DateTime.to_unix(authorization_expires_at, :second) - ) + expires_at: DateTime.to_unix(authorization_expires_at, :second) }) # Start tracking flow @@ -366,13 +373,11 @@ defmodule API.Gateway.Channel do {channel_pid, socket_ref, resource.id} ) - expires_at = DateTime.to_unix(authorization_expires_at, :second) - push(socket, "allow_access", %{ ref: ref, client_id: client.id, resource: Views.Resource.render(resource), - expires_at: expires_at, + expires_at: DateTime.to_unix(authorization_expires_at, :second), payload: payload, client_ipv4: client.ipv4, client_ipv6: client.ipv6 @@ -421,13 +426,11 @@ defmodule API.Gateway.Channel do {channel_pid, socket_ref, resource.id} ) - expires_at = DateTime.to_unix(authorization_expires_at, :second) - push(socket, "request_connection", %{ ref: ref, resource: Views.Resource.render(resource), client: Views.Client.render(client, payload, preshared_key), - expires_at: expires_at + expires_at: DateTime.to_unix(authorization_expires_at, :second) }) # Start tracking the flow @@ -638,10 +641,7 @@ defmodule API.Gateway.Channel do # This data structure is used to efficiently: # 1. Check if there are any active flows remaining for this client/resource? # 2. Remove a deleted flow - |> Enum.reduce(%{}, fn {{client_id, resource_id}, {flow_id, inserted_at}}, acc -> - # Assume all flows have a 14 day expiration if they still exist - expires_at = DateTime.add(inserted_at, 14, :day) - + |> Enum.reduce(%{}, fn {{client_id, resource_id}, {flow_id, expires_at}}, acc -> flow_id_map = Map.get(acc, {client_id, resource_id}, %{}) Map.put(acc, {client_id, resource_id}, Map.put(flow_id_map, flow_id, expires_at)) diff --git a/elixir/apps/api/lib/api/gateway/views/flow.ex b/elixir/apps/api/lib/api/gateway/views/flow.ex index 03b776fac..abd6a3f6a 100644 --- a/elixir/apps/api/lib/api/gateway/views/flow.ex +++ b/elixir/apps/api/lib/api/gateway/views/flow.ex @@ -1,10 +1,21 @@ defmodule API.Gateway.Views.Flow do + def render(flow, expires_at) do + %{ + client_id: flow.client_id, + resource_id: flow.resource_id, + expires_at: DateTime.to_unix(expires_at, :second) + } + end + def render_many(flows) do flows - |> Enum.map(fn {{client_id, resource_id}, _flow_map} -> + |> Enum.map(fn {{client_id, resource_id}, flow_map} -> + expires_at = Enum.min(Map.values(flow_map)) + %{ client_id: client_id, - resource_id: resource_id + resource_id: resource_id, + expires_at: DateTime.to_unix(expires_at, :second) } end) end diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index 2f25d843a..aa0b64a6f 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -1073,7 +1073,6 @@ defmodule API.Client.ChannelTest do "connected_gateway_ids" => [] }) - # assert_reply ref, :error, %{reason: :not_found} assert_push "flow_creation_failed", %{reason: :not_found, resource_id: ^resource_id} end @@ -1098,7 +1097,6 @@ defmodule API.Client.ChannelTest do "connected_gateway_ids" => [] }) - # assert_reply ref, :error, %{reason: :offline} assert_push "flow_creation_failed", %{reason: :offline, resource_id: resource_id} assert resource_id == resource.id end @@ -1119,8 +1117,6 @@ defmodule API.Client.ChannelTest do push(socket, "create_flow", attrs) - # assert_reply ref, :error, %{reason: :not_found} - assert_push "flow_creation_failed", %{reason: :not_found, resource_id: resource_id} assert resource_id == resource.id end diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index b0b7b4148..0e441c3ad 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -1027,45 +1027,6 @@ defmodule API.Gateway.ChannelTest do DateTime.truncate(expires_at, :second) end - test "pushes authorize_flow message for authorizations that do not expire", %{ - client: client, - gateway: gateway, - resource: resource, - account: account, - socket: socket - } do - flow = - Fixtures.Flows.create_flow( - account: account, - client: client, - resource: resource - ) - - channel_pid = self() - socket_ref = make_ref() - preshared_key = "PSK" - - ice_credentials = %{ - client: %{username: "A", password: "B"}, - gateway: %{username: "C", password: "D"} - } - - send( - socket.channel_pid, - {{:authorize_flow, gateway.id}, {channel_pid, socket_ref}, - %{ - client: client, - resource: resource, - flow_id: flow.id, - authorization_expires_at: nil, - ice_credentials: ice_credentials, - preshared_key: preshared_key - }} - ) - - assert_push "authorize_flow", %{expires_at: nil} - end - test "authorize_flow tracks flow and sends reject_access when flow is deleted", %{ account: account, client: client, diff --git a/elixir/apps/domain/lib/domain/actors.ex b/elixir/apps/domain/lib/domain/actors.ex index 0acfb21ee..cf1e7f49a 100644 --- a/elixir/apps/domain/lib/domain/actors.ex +++ b/elixir/apps/domain/lib/domain/actors.ex @@ -42,7 +42,7 @@ defmodule Domain.Actors do Membership.Query.all() |> Membership.Query.by_actor_id(actor_id) |> Membership.Query.by_group_id(group_id) - |> Repo.fetch(Membership.Query, []) + |> Repo.fetch(Membership.Query) end def list_groups(%Auth.Subject{} = subject, opts \\ []) do @@ -81,6 +81,13 @@ defmodule Domain.Actors do |> Repo.preload(preload) end + def all_memberships_for_actor!(%Actor{} = actor) do + Membership.Query.all() + |> Membership.Query.by_account_id(actor.account_id) + |> Membership.Query.by_actor_id(actor.id) + |> Repo.all() + end + def list_editable_groups(%Auth.Subject{} = subject, opts \\ []) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_actors_permission()) do Group.Query.not_deleted() diff --git a/elixir/apps/domain/lib/domain/clients.ex b/elixir/apps/domain/lib/domain/clients.ex index e9af27897..79093dfa9 100644 --- a/elixir/apps/domain/lib/domain/clients.ex +++ b/elixir/apps/domain/lib/domain/clients.ex @@ -39,6 +39,12 @@ defmodule Domain.Clients do |> Repo.aggregate(:count) end + def fetch_client_by_id(id, preload: :identity) do + Client.Query.not_deleted() + |> Client.Query.by_id(id) + |> Repo.fetch(Client.Query, preload: :identity) + end + def fetch_client_by_id(id, %Auth.Subject{} = subject, opts \\ []) do required_permissions = {:one_of, @@ -158,9 +164,12 @@ defmodule Domain.Clients do %{client: %Client{} = client, ipv4: ipv4, ipv6: ipv6} -> Client.Changeset.finalize_upsert(client, ipv4, ipv6) end) + |> Ecto.Multi.run(:client_with_identity, fn _repo, %{client_with_address: client} -> + {:ok, Repo.preload(client, :identity)} + end) |> Repo.transaction() |> case do - {:ok, %{client_with_address: client}} -> {:ok, client} + {:ok, %{client_with_identity: client}} -> {:ok, client} {:error, :client, changeset, _effects_so_far} -> {:error, changeset} end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/policies.ex b/elixir/apps/domain/lib/domain/events/hooks/policies.ex index 1a77125fa..2e5f4529a 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/policies.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/policies.ex @@ -12,14 +12,12 @@ defmodule Domain.Events.Hooks.Policies do @impl true # Disable - process as delete - def on_update(%{"disabled_at" => nil}, %{"disabled_at" => disabled_at} = data) when not is_nil(disabled_at) do on_delete(data) end # Enable - process as insert - def on_update(%{"disabled_at" => disabled_at}, %{"disabled_at" => nil} = data) when not is_nil(disabled_at) do on_insert(data) @@ -38,6 +36,8 @@ defmodule Domain.Events.Hooks.Policies do # Breaking updates # This is a special case - we need to delete related flows because connectivity has changed + # The Gateway PID will receive flow deletion messages and process them to potentially reject + # access. The client PID (if connected) will toggle the resource deleted/created. if old_policy.conditions != policy.conditions or old_policy.actor_group_id != policy.actor_group_id or old_policy.resource_id != policy.resource_id do diff --git a/elixir/apps/domain/lib/domain/events/hooks/resources.ex b/elixir/apps/domain/lib/domain/events/hooks/resources.ex index 7cf2615e8..57f1a0521 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/resources.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/resources.ex @@ -22,9 +22,14 @@ defmodule Domain.Events.Hooks.Resources do resource = SchemaHelpers.struct_from_params(Resources.Resource, data) # Breaking updates + # This is a special case - we need to delete related flows because connectivity has changed # Gateway _does_ handle resource filter changes so we don't need to delete flows - # for those changes + # for those changes - they're processed by the Gateway channel pid. + + # The Gateway channel will process these flow deletions and end up sending reject_access for any + # affected flows. If the client is connected at the time of the update, it will handle this + # by toggling the resource deleted then created. if old_resource.ip_stack != resource.ip_stack or old_resource.type != resource.type or old_resource.address != resource.address do diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index 61c38c20b..1c0cb5837 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -21,6 +21,7 @@ defmodule Domain.Flows do }, resource_id, %Policies.Policy{} = policy, + membership_id, %Auth.Subject{ account: %{id: account_id}, actor: %{id: actor_id}, @@ -29,11 +30,10 @@ defmodule Domain.Flows do remote_ip: client_remote_ip, user_agent: client_user_agent } - } = subject + } = subject, + expires_at ) do - with :ok <- Auth.ensure_has_permissions(subject, Authorizer.create_flows_permission()), - {:ok, membership} <- - Actors.fetch_membership_by_actor_id_and_group_id(actor_id, policy.actor_group_id) do + with :ok <- Auth.ensure_has_permissions(subject, Authorizer.create_flows_permission()) do flow = Flow.Changeset.create(%{ token_id: token_id, @@ -41,11 +41,12 @@ defmodule Domain.Flows do client_id: client_id, gateway_id: gateway_id, resource_id: resource_id, - actor_group_membership_id: membership.id, + actor_group_membership_id: membership_id, account_id: account_id, client_remote_ip: client_remote_ip, client_user_agent: client_user_agent, - gateway_remote_ip: gateway_remote_ip + gateway_remote_ip: gateway_remote_ip, + expires_at: expires_at }) |> Repo.insert!() @@ -53,6 +54,52 @@ defmodule Domain.Flows do end end + # When the last flow in a Gateway's cache is deleted, we need to see if there are + # any other policies potentially authorizing the client before sending reject_access. + # This can happen if a Policy was created that grants redundant access to a client that + # is already connected to the Resource, then the initial Policy is deleted. + # + # We need to create a new flow with the new Policy but the same (or shorter) expiration as + # the old flow. + def reauthorize_flow(%Flow{} = flow) do + with {:ok, client} <- Clients.fetch_client_by_id(flow.client_id, preload: :identity), + policies when policies != [] <- + Policies.all_policies_for_resource_id!( + flow.account_id, + flow.resource_id + ), + conforming_policies when conforming_policies != [] <- + Policies.filter_by_conforming_policies_for_client(policies, client), + policy <- Enum.at(conforming_policies, 0), + {:ok, expires_at} <- Policies.ensure_client_conforms_policy_conditions(client, policy), + {:ok, membership} <- + Actors.fetch_membership_by_actor_id_and_group_id( + client.actor_id, + policy.actor_group_id + ), + {:ok, new_flow} <- + Flow.Changeset.create(%{ + token_id: flow.token_id, + policy_id: Enum.at(conforming_policies, 0).id, + client_id: flow.client_id, + gateway_id: flow.gateway_id, + resource_id: flow.resource_id, + actor_group_membership_id: membership.id, + account_id: flow.account_id, + client_remote_ip: client.last_seen_remote_ip, + client_user_agent: client.last_seen_user_agent, + gateway_remote_ip: flow.gateway_remote_ip, + expires_at: expires_at || flow.expires_at + }) + |> Repo.insert() do + {:ok, new_flow} + else + reason -> + Logger.info("Failed to reauthorize flow: #{inspect(reason)}") + {:error, :forbidden} + end + end + def fetch_flow_by_id(id, %Auth.Subject{} = subject, opts \\ []) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_flows_permission()), true <- Repo.valid_uuid?(id) do @@ -70,6 +117,7 @@ defmodule Domain.Flows do Flow.Query.all() |> Flow.Query.by_account_id(gateway.account_id) |> Flow.Query.by_gateway_id(gateway.id) + |> Flow.Query.not_expired() |> Flow.Query.for_cache() |> Repo.all() end diff --git a/elixir/apps/domain/lib/domain/flows/flow.ex b/elixir/apps/domain/lib/domain/flows/flow.ex index b5e421cc1..ce7797b53 100644 --- a/elixir/apps/domain/lib/domain/flows/flow.ex +++ b/elixir/apps/domain/lib/domain/flows/flow.ex @@ -11,11 +11,13 @@ defmodule Domain.Flows.Flow do belongs_to :account, Domain.Accounts.Account + # TODO: These can be removed since we don't use them field :client_remote_ip, Domain.Types.IP field :client_user_agent, :string - field :gateway_remote_ip, Domain.Types.IP + field :expires_at, :utc_datetime_usec + timestamps(updated_at: false) end end diff --git a/elixir/apps/domain/lib/domain/flows/flow/changeset.ex b/elixir/apps/domain/lib/domain/flows/flow/changeset.ex index 012a3ddd0..08bc9a9dd 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/changeset.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/changeset.ex @@ -4,6 +4,7 @@ defmodule Domain.Flows.Flow.Changeset do @fields ~w[token_id policy_id client_id gateway_id resource_id actor_group_membership_id account_id + expires_at client_remote_ip client_user_agent gateway_remote_ip]a diff --git a/elixir/apps/domain/lib/domain/flows/flow/query.ex b/elixir/apps/domain/lib/domain/flows/flow/query.ex index 7edc048bd..ef10e103f 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/query.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/query.ex @@ -5,6 +5,11 @@ defmodule Domain.Flows.Flow.Query do from(flows in Domain.Flows.Flow, as: :flows) end + def not_expired(queryable) do + now = DateTime.utc_now() + where(queryable, [flows: flows], flows.expires_at > ^now) + end + def by_id(queryable, id) do where(queryable, [flows: flows], flows.id == ^id) end @@ -21,14 +26,11 @@ defmodule Domain.Flows.Flow.Query do where(queryable, [flows: flows], flows.policy_id == ^policy_id) end - # Return the latest {client_id, resource_id} pairs over the last 14 days def for_cache(queryable) do - cutoff = DateTime.utc_now() |> DateTime.add(-14, :day) - - where(queryable, [flows: flows], flows.inserted_at > ^cutoff) + queryable |> select( [flows: flows], - {{flows.client_id, flows.resource_id}, {flows.id, flows.inserted_at}} + {{flows.client_id, flows.resource_id}, {flows.id, flows.expires_at}} ) end diff --git a/elixir/apps/domain/lib/domain/policies.ex b/elixir/apps/domain/lib/domain/policies.ex index 0be4bcf68..f1aafe220 100644 --- a/elixir/apps/domain/lib/domain/policies.ex +++ b/elixir/apps/domain/lib/domain/policies.ex @@ -74,6 +74,13 @@ defmodule Domain.Policies do |> Repo.all() end + def all_policies_for_resource_id!(account_id, resource_id) do + Policy.Query.not_disabled() + |> Policy.Query.by_account_id(account_id) + |> Policy.Query.by_resource_id(resource_id) + |> Repo.all() + end + def new_policy(attrs, %Auth.Subject{} = subject) do Policy.Changeset.create(attrs, subject) end diff --git a/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex b/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex index 1a4531b37..364ef145e 100644 --- a/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex +++ b/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex @@ -1,5 +1,4 @@ defmodule Domain.Policies.Condition.Evaluator do - alias Domain.Repo alias Domain.Clients alias Domain.Policies.Condition @@ -17,8 +16,6 @@ defmodule Domain.Policies.Condition.Evaluator do end def ensure_conforms(conditions, %Clients.Client{} = client) when is_list(conditions) do - client = Repo.preload(client, :identity) - conditions |> Enum.reduce({[], nil}, fn condition, {violated_properties, min_expires_at} -> if condition.property in violated_properties do @@ -102,8 +99,6 @@ defmodule Domain.Policies.Condition.Evaluator do %Condition{property: :provider_id, operator: :is_in, values: values}, %Clients.Client{} = client ) do - client = Repo.preload(client, :identity) - if client.identity.provider_id in values do {:ok, nil} else @@ -115,8 +110,6 @@ defmodule Domain.Policies.Condition.Evaluator do %Condition{property: :provider_id, operator: :is_not_in, values: values}, %Clients.Client{} = client ) do - client = Repo.preload(client, :identity) - if client.identity.provider_id in values do :error else diff --git a/elixir/apps/domain/lib/domain/policies/policy/query.ex b/elixir/apps/domain/lib/domain/policies/policy/query.ex index dff77a258..6fcfc06a5 100644 --- a/elixir/apps/domain/lib/domain/policies/policy/query.ex +++ b/elixir/apps/domain/lib/domain/policies/policy/query.ex @@ -42,6 +42,12 @@ defmodule Domain.Policies.Policy.Query do where(queryable, [policies: policies], policies.resource_id in ^resource_ids) end + def by_actor_group_membership_id(queryable, membership_id) do + queryable + |> with_joined_memberships() + |> where([memberships: memberships], memberships.id == ^membership_id) + end + def by_actor_group_id(queryable, actor_group_id) do queryable |> where([policies: policies], policies.actor_group_id == ^actor_group_id) diff --git a/elixir/apps/domain/priv/repo/manual_migrations/20250722233834_index_flows_on_expires_at.exs b/elixir/apps/domain/priv/repo/manual_migrations/20250722233834_index_flows_on_expires_at.exs new file mode 100644 index 000000000..c0ace1722 --- /dev/null +++ b/elixir/apps/domain/priv/repo/manual_migrations/20250722233834_index_flows_on_expires_at.exs @@ -0,0 +1,11 @@ +defmodule Domain.Repo.Migrations.IndexFlowsOnExpiresAt do + use Ecto.Migration + + @disable_ddl_transaction true + + def change do + create_if_not_exists( + index(:flows, [:account_id, :expires_at, :gateway_id], concurrently: true) + ) + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250722232957_add_expires_at_to_flows.exs b/elixir/apps/domain/priv/repo/migrations/20250722232957_add_expires_at_to_flows.exs new file mode 100644 index 000000000..4f6f3fa0d --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250722232957_add_expires_at_to_flows.exs @@ -0,0 +1,21 @@ +defmodule Domain.Repo.Migrations.AddExpiresAtToFlows do + use Ecto.Migration + + def change do + alter(table(:flows)) do + add_if_not_exists(:expires_at, :utc_datetime_usec) + end + + # Unfortunately cross-referencing any related token or policy expiration + # to improve this is too complex to be considered worth it at this point in time. + execute(""" + UPDATE flows + SET expires_at = NOW() + INTERVAL '14 days' + """) + + execute(""" + ALTER TABLE flows + ALTER COLUMN expires_at SET NOT NULL + """) + end +end diff --git a/elixir/apps/domain/priv/repo/seeds.exs b/elixir/apps/domain/priv/repo/seeds.exs index c158e162d..faab1b73b 100644 --- a/elixir/apps/domain/priv/repo/seeds.exs +++ b/elixir/apps/domain/priv/repo/seeds.exs @@ -1175,13 +1175,21 @@ defmodule Domain.Repo.Seeds do IO.puts("") + membership = + Repo.get_by(Domain.Actors.Membership, + group_id: synced_group.id, + actor_id: unprivileged_actor.id + ) + {:ok, _flow} = Flows.create_flow( user_iphone, gateway1, cidr_resource.id, policy, - unprivileged_subject + membership.id, + unprivileged_subject, + unprivileged_subject.expires_at ) end end diff --git a/elixir/apps/domain/test/domain/clients_test.exs b/elixir/apps/domain/test/domain/clients_test.exs index ebfa8c38b..c25adffeb 100644 --- a/elixir/apps/domain/test/domain/clients_test.exs +++ b/elixir/apps/domain/test/domain/clients_test.exs @@ -108,7 +108,10 @@ defmodule Domain.ClientsTest do test "returns client by id", %{unprivileged_actor: actor, unprivileged_subject: subject} do client = Fixtures.Clients.create_client(actor: actor) - assert fetch_client_by_id(client.id, subject, preload: [:online?]) == {:ok, client} + client = Repo.preload(client, :identity) + + assert fetch_client_by_id(client.id, subject, preload: [:online?, :identity]) == + {:ok, client} end test "preloads online status", %{unprivileged_actor: actor, unprivileged_subject: subject} do @@ -135,7 +138,8 @@ defmodule Domain.ClientsTest do |> Fixtures.Auth.remove_permissions() |> Fixtures.Auth.add_permission(Clients.Authorizer.manage_clients_permission()) - assert fetch_client_by_id(client.id, subject, preload: [:online?]) == {:ok, client} + assert fetch_client_by_id(client.id, subject, preload: [:online?, :identity]) == + {:ok, client} end test "does not returns client that belongs to another account with manage permission", %{ @@ -215,7 +219,7 @@ defmodule Domain.ClientsTest do test "returns client by id", %{unprivileged_actor: actor} do client = Fixtures.Clients.create_client(actor: actor) - assert fetch_client_by_id!(client.id, preload: [:online?]) == client + assert fetch_client_by_id!(client.id, preload: [:online?, :identity]) == client end test "preloads online status", %{unprivileged_actor: actor} do @@ -357,7 +361,7 @@ defmodule Domain.ClientsTest do Fixtures.Clients.create_client(actor: other_actor) assert {:ok, [^client], _metadata} = - list_clients_by_actor_id(actor.id, subject, preload: [:online?]) + list_clients_by_actor_id(actor.id, subject, preload: [:online?, :identity]) assert {:ok, [], _metadata} = list_clients_by_actor_id(other_actor.id, subject) end @@ -993,7 +997,7 @@ defmodule Domain.ClientsTest do for field <- fields do assert {:ok, updated_client} = update_client(client, %{field => value}, subject) - assert updated_client == client + assert Repo.preload(updated_client, :identity) == client end end diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index 6b9a2e68d..148646e20 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -54,7 +54,7 @@ defmodule Domain.FlowsTest do } end - describe "create_flow/4" do + describe "create_flow/7" do # test "returns error when resource does not exist", %{ # client: client, # gateway: gateway, @@ -258,10 +258,19 @@ defmodule Domain.FlowsTest do client = Fixtures.Clients.create_client(account: account, actor: actor) subject = Fixtures.Auth.create_subject(account: account, actor: actor) - Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group) + membership = + Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group) assert {:ok, %Flows.Flow{} = flow} = - create_flow(client, gateway, resource.id, policy, subject) + create_flow( + client, + gateway, + resource.id, + policy, + membership.id, + subject, + subject.expires_at + ) assert flow.policy_id == policy.id assert flow.client_id == client.id @@ -271,6 +280,8 @@ defmodule Domain.FlowsTest do assert flow.client_remote_ip.address == subject.context.remote_ip assert flow.client_user_agent == subject.context.user_agent assert flow.gateway_remote_ip == gateway.last_seen_remote_ip + assert flow.actor_group_membership_id == membership.id + assert flow.expires_at == subject.expires_at end test "creates a new flow for service accounts", %{ @@ -281,7 +292,9 @@ defmodule Domain.FlowsTest do policy: policy } do actor = Fixtures.Actors.create_actor(type: :service_account, account: account) - Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group) + + membership = + Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group) identity = Fixtures.Auth.create_identity(account: account, actor: actor) subject = Fixtures.Auth.create_subject(identity: identity) @@ -289,7 +302,15 @@ defmodule Domain.FlowsTest do client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity) assert {:ok, %Flows.Flow{} = flow} = - create_flow(client, gateway, resource.id, policy, subject) + create_flow( + client, + gateway, + resource.id, + policy, + membership.id, + subject, + subject.expires_at + ) assert flow.policy_id == policy.id assert flow.client_id == client.id @@ -299,6 +320,8 @@ defmodule Domain.FlowsTest do assert flow.client_remote_ip.address == subject.context.remote_ip assert flow.client_user_agent == subject.context.user_agent assert flow.gateway_remote_ip == gateway.last_seen_remote_ip + assert flow.actor_group_membership_id == membership.id + assert flow.expires_at == subject.expires_at end # TODO: Rename Flows @@ -332,6 +355,103 @@ defmodule Domain.FlowsTest do # end end + describe "reauthorize_flow/1" do + test "when another valid policy exists for the resource", + %{ + account: account, + actor: actor, + membership: membership, + subject: subject, + client: client, + gateway: gateway, + resource: resource, + policy: policy + } do + other_group = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: other_group + ) + + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + actor_group_membership: membership, + policy: policy, + resource: resource, + gateway: gateway + ) + + Fixtures.Policies.create_policy( + account: account, + actor_group: other_group, + resource: resource, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: [client.last_seen_remote_ip_location_region] + } + ] + ) + + assert {:ok, reauthorized_flow} = reauthorize_flow(flow) + assert reauthorized_flow.resource_id == flow.resource_id + end + + test "when no more valid policies exist for the resource", + %{ + account: account, + actor: actor, + membership: membership, + subject: subject, + client: client, + gateway: gateway, + resource: resource, + policy: policy + } do + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + actor_group_membership: membership, + policy: policy, + resource: resource, + gateway: gateway + ) + + other_group = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: other_group + ) + + Repo.delete_all(Domain.Policies.Policy) + + Fixtures.Policies.create_policy( + account: account, + actor_group: other_group, + resource: resource, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["AU"] + } + ] + ) + + assert {:error, :forbidden} = reauthorize_flow(flow) + end + end + describe "fetch_flow_by_id/3" do test "returns error when flow does not exist", %{subject: subject} do assert fetch_flow_by_id(Ecto.UUID.generate(), subject) == {:error, :not_found} @@ -458,8 +578,8 @@ defmodule Domain.FlowsTest do flows = all_gateway_flows_for_cache!(gateway) - assert {{flow1.client_id, flow1.resource_id}, {flow1.id, flow1.inserted_at}} in flows - assert {{flow2.client_id, flow2.resource_id}, {flow2.id, flow2.inserted_at}} in flows + assert {{flow1.client_id, flow1.resource_id}, {flow1.id, flow1.expires_at}} in flows + assert {{flow2.client_id, flow2.resource_id}, {flow2.id, flow2.expires_at}} in flows end end diff --git a/elixir/apps/domain/test/support/fixtures/flows.ex b/elixir/apps/domain/test/support/fixtures/flows.ex index e3c1702f1..c503d5235 100644 --- a/elixir/apps/domain/test/support/fixtures/flows.ex +++ b/elixir/apps/domain/test/support/fixtures/flows.ex @@ -90,7 +90,8 @@ defmodule Domain.Fixtures.Flows do account_id: account.id, client_remote_ip: client.last_seen_remote_ip, client_user_agent: client.last_seen_user_agent, - gateway_remote_ip: gateway.last_seen_remote_ip + gateway_remote_ip: gateway.last_seen_remote_ip, + expires_at: subject.expires_at || DateTime.utc_now() |> DateTime.add(14, :day) }) |> Repo.insert!() end