From 54d91e2004251d0b14340fdedaf6001d4f1845d8 Mon Sep 17 00:00:00 2001 From: Jamil Date: Thu, 31 Jul 2025 20:03:00 -0400 Subject: [PATCH] fix(portal): don't send reject_access for remaining flows (#10071) This fixes a simple logic bug where we were mistakenly reacting to a flow deletion event where flows still existed in the cache by sending `reject_access`. This fixes that bug, and adds more comprehensive logging to help diagnose issues like this more quickly in the future. This PR also fixes the following issues found during the investigation: - We were redundantly reacting to Token deletion in the channel pids. This is unnecessary: we send a global socket disconnect from the Token hook module instead. - We had a bug that would crash the WAL consumer if a "global" token (i.e. relay) was deleted or expired - these have no `account_id`. - We now always use `min(max(all_conforming_polices_expiration), token.expires_at)` when setting expiration on a new flow to minimize the possibility for access churn. - We now check to ensure the token and gateway are still undeleted when re-authorizing a given flow. This prevents us from failing to send `reject_access` when a token or gateway is deleted corresponding to a flow, but the other entities would have granted access. Related: https://firezone.statuspage.io/incidents/xrsm13tml3dh Related: #10068 Related: #9501 --- elixir/apps/api/lib/api/client/channel.ex | 74 +--- elixir/apps/api/lib/api/gateway/channel.ex | 85 ++-- .../apps/api/test/api/client/channel_test.exs | 30 +- .../api/test/api/gateway/channel_test.exs | 48 ++- elixir/apps/domain/lib/domain/flows.ex | 52 ++- elixir/apps/domain/lib/domain/gateways.ex | 6 + elixir/apps/domain/lib/domain/policies.ex | 62 ++- .../domain/policies/condition/evaluator.ex | 7 - elixir/apps/domain/lib/domain/pubsub.ex | 8 + elixir/apps/domain/lib/domain/tokens.ex | 7 + .../apps/domain/test/domain/policies_test.exs | 402 ++++++++++++++++-- 11 files changed, 597 insertions(+), 184 deletions(-) diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index ea0e4fc13..e2b3aed60 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -12,8 +12,7 @@ defmodule API.Client.Channel do Gateways, Relays, Policies, - Flows, - Tokens + Flows } alias Domain.Relays.Presence.Debouncer @@ -40,8 +39,7 @@ defmodule API.Client.Channel do @impl true def join("client", _payload, socket) do - with {:ok, socket} <- schedule_expiration(socket), - {:ok, gateway_version_requirement} <- + with {:ok, gateway_version_requirement} <- select_gateway_version_requirement(socket.assigns.client) do socket = assign(socket, gateway_version_requirement: gateway_version_requirement) @@ -51,19 +49,6 @@ defmodule API.Client.Channel do end end - defp schedule_expiration(%{assigns: %{subject: %{expires_at: expires_at}}} = socket) do - expires_in = - expires_at - |> DateTime.diff(DateTime.utc_now(), :millisecond) - - if expires_in > 0 do - Process.send_after(self(), :token_expired, expires_in) - {:ok, socket} - else - {:error, %{reason: :token_expired}} - end - end - @impl true # Called immediately after the client joins the channel @@ -542,26 +527,6 @@ defmodule API.Client.Channel do end end - # TOKENS - - def handle_info( - {:deleted, %Tokens.Token{type: :client, id: id}}, - %{assigns: %{subject: %{token_id: token_id}}} = socket - ) - when id == token_id do - disconnect(socket) - end - - #################################### - ##### Reacting to timed events ##### - #################################### - - # Message is scheduled by schedule_expiration/1 on topic join to be sent - # when the client token/subject expires - def handle_info(:token_expired, socket) do - disconnect(socket) - end - #################################### #### Reacting to relay presence #### #################################### @@ -758,7 +723,7 @@ defmodule API.Client.Channel do } with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id), - {:ok, policy, expires_at} <- authorize_resource(socket, resource_id), + {:ok, expires_at, policy} <- authorize_resource(socket, resource_id), {:ok, gateways} when gateways != [] <- Gateways.all_connected_gateways_for_resource(resource, socket.assigns.subject, preload: :group @@ -863,7 +828,7 @@ defmodule API.Client.Channel do # TODO: Optimization # Gateway selection and flow authorization shouldn't need to hit the DB with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id), - {:ok, _policy, _expires_at} <- authorize_resource(socket, resource_id), + {:ok, _expires_at, _policy} <- authorize_resource(socket, resource_id), {:ok, [_ | _] = gateways} <- Gateways.all_connected_gateways_for_resource(resource, socket.assigns.subject, preload: :group @@ -920,7 +885,7 @@ defmodule API.Client.Channel do socket ) do with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id), - {:ok, policy, expires_at} <- authorize_resource(socket, resource_id), + {:ok, expires_at, policy} <- authorize_resource(socket, resource_id), {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do # TODO: Optimization @@ -980,7 +945,7 @@ defmodule API.Client.Channel do ) do # Flow authorization can happen out-of-band since we just authorized the resource above with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id), - {:ok, policy, expires_at} <- authorize_resource(socket, resource_id), + {:ok, expires_at, policy} <- authorize_resource(socket, resource_id), {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do # TODO: Optimization @@ -1285,7 +1250,7 @@ defmodule API.Client.Channel do end end - # Returns either the authorized policy or an error tuple of violated properties + # Returns either the longest authorized policy or an error tuple of violated properties defp authorize_resource(socket, resource_id) do OpenTelemetry.Tracer.with_span "client.authorize_resource", attributes: %{ @@ -1294,27 +1259,10 @@ defmodule API.Client.Channel do socket.assigns.policies |> Enum.filter(fn {_id, policy} -> policy.resource_id == resource_id end) |> Enum.map(fn {_id, policy} -> policy end) - |> Enum.reduce_while({:error, []}, fn policy, {:error, acc} -> - case Policies.ensure_client_conforms_policy_conditions(socket.assigns.client, policy) do - {:ok, expires_at} -> - {:halt, {:ok, policy, expires_at}} - - {:error, {:forbidden, violated_properties: violated_properties}} -> - {:cont, {:error, violated_properties ++ acc}} - end - end) - |> case do - {:error, violated_properties} -> - {:error, {:forbidden, violated_properties: violated_properties}} - - {:ok, policy, expires_at} -> - # Set a maximum expiration time for the authorization - expires_at = - expires_at || socket.assigns.subject.expires_at || - DateTime.utc_now() |> DateTime.add(14, :day) - - {:ok, policy, expires_at} - end + |> Policies.longest_conforming_policy_for_client( + socket.assigns.client, + socket.assigns.subject.expires_at + ) end end end diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index f1530956e..8e350e35e 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Channel do use API, :channel alias API.Gateway.Views - alias Domain.{Accounts, Flows, Gateways, PubSub, Relays, Resources, Tokens} + alias Domain.{Accounts, Flows, Gateways, PubSub, Relays, Resources} alias Domain.Relays.Presence.Debouncer require Logger require OpenTelemetry.Tracer @@ -110,38 +110,67 @@ defmodule API.Gateway.Channel do when gateway_id == id do tuple = {flow.client_id, flow.resource_id} - socket = - if flows_map = Map.get(socket.assigns.flows, tuple) do - flow_map = Map.delete(flows_map, flow.id) + if flows_map = Map.get(socket.assigns.flows, tuple) do + flow_map = Map.delete(flows_map, flow.id) + remaining = map_size(flow_map) - with 0 <- map_size(flow_map), - {:ok, new_flow} <- Flows.reauthorize_flow(flow) do - flow_map = %{ - new_flow.id => new_flow.expires_at - } + if remaining == 0 do + case Flows.reauthorize_flow(flow) do + {:ok, new_flow} -> + flow_map = %{ + new_flow.id => new_flow.expires_at + } - push( - socket, - "access_authorization_expiry_updated", - Views.Flow.render(new_flow, new_flow.expires_at) - ) + Logger.info("Updated flow authorization", + old_flow_id: flow.id, + account_id: flow.account_id, + client_id: flow.client_id, + resource_id: flow.resource_id, + new_flow_id: new_flow.id + ) + + push( + socket, + "access_authorization_expiry_updated", + Views.Flow.render(new_flow, new_flow.expires_at) + ) + + {:noreply, assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))} + + {:error, :forbidden} -> + Logger.info("Last flow deleted; revoking access", + flow_id: flow.id, + account_id: flow.account_id, + client_id: flow.client_id, + resource_id: flow.resource_id + ) - assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map)) - else - _ -> # Send reject_access if access is no longer granted + + # TODO: Verify that if the client's websocket connection flaps at the moment this + # message is received by the gateway, and the client still has access to this resource, + # the client will clear its state so it can request a new flow. push(socket, "reject_access", %{ client_id: flow.client_id, resource_id: flow.resource_id }) - assign(socket, flows: Map.delete(socket.assigns.flows, tuple)) + {:noreply, assign(socket, flows: Map.delete(socket.assigns.flows, tuple))} end else - socket - end + Logger.info("Flow deleted but still has remaining flows for client/resource", + flow_id: flow.id, + account_id: flow.account_id, + client_id: flow.client_id, + resource_id: flow.resource_id, + remaining: remaining + ) - {:noreply, socket} + {:noreply, assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))} + end + else + {:noreply, socket} + end end # RESOURCES @@ -165,14 +194,6 @@ defmodule API.Gateway.Channel do {:noreply, socket} end - # TOKENS - - # Our gateway token was deleted - disconnect WebSocket - def handle_info({:deleted, %Tokens.Token{type: :gateway_group, id: id}}, socket) - when id == socket.assigns.token_id do - disconnect(socket) - end - #################################### #### Reacting to relay presence #### #################################### @@ -650,10 +671,4 @@ defmodule API.Gateway.Channel do assign(socket, flows: flows) end end - - defp disconnect(socket) do - push(socket, "disconnect", %{"reason" => "token_expired"}) - send(socket.transport_pid, %Phoenix.Socket.Broadcast{event: "disconnect"}) - {:stop, :shutdown, socket} - end end diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index aa0b64a6f..1370e6525 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -200,14 +200,16 @@ defmodule API.Client.ChannelTest do refute_receive {:socket_close, _pid, _} end - test "expires the channel when token is expired", %{client: client, subject: subject} do - expires_at = DateTime.utc_now() |> DateTime.add(25, :millisecond) - subject = %{subject | expires_at: expires_at} - + test "send disconnect broadcast when the token is deleted", %{ + client: client, + subject: subject + } do # We need to trap exits to avoid test process termination # because it is linked to the created test channel process Process.flag(:trap_exit, true) + :ok = Domain.PubSub.subscribe("sessions:#{subject.token_id}") + {:ok, _reply, _socket} = API.Client.Socket |> socket("client:#{client.id}", %{ @@ -216,9 +218,23 @@ defmodule API.Client.ChannelTest do }) |> subscribe_and_join(API.Client.Channel, "client") - assert_push "disconnect", %{reason: :token_expired}, 250 - assert_receive {:EXIT, _pid, :shutdown} - assert_receive {:socket_close, _pid, :shutdown} + token = Repo.get_by(Domain.Tokens.Token, id: subject.token_id) + + data = %{ + "id" => token.id, + "account_id" => token.account_id, + "type" => token.type, + "expires_at" => token.expires_at + } + + Domain.Events.Hooks.Tokens.on_delete(data) + + assert_receive %Phoenix.Socket.Broadcast{ + topic: topic, + event: "disconnect" + } + + assert topic == "sessions:#{token.id}" end test "selects compatible gateway versions", %{client: client, subject: subject} do diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 0e441c3ad..f6aaedfd1 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -126,6 +126,7 @@ defmodule API.Gateway.ChannelTest do Process.flag(:trap_exit, true) :ok = Domain.PubSub.Account.subscribe(account.id) + :ok = Domain.PubSub.subscribe("sessions:#{token.id}") data = %{ "id" => token.id, @@ -136,11 +137,14 @@ defmodule API.Gateway.ChannelTest do Events.Hooks.Tokens.on_delete(data) assert_receive {:deleted, deleted_token} - assert_push "disconnect", payload - assert_receive {:EXIT, _pid, _} - assert_receive {:socket_close, _pid, _} + + assert_receive %Phoenix.Socket.Broadcast{ + topic: topic, + event: "disconnect" + } + + assert topic == "sessions:#{token.id}" assert deleted_token.id == token.id - assert payload == %{"reason" => "token_expired"} end test "pushes allow_access message", %{ @@ -294,7 +298,11 @@ defmodule API.Gateway.ChannelTest do "id" => flow1.id, "client_id" => client.id, "resource_id" => resource.id, - "account_id" => account.id + "account_id" => account.id, + "token_id" => flow1.token_id, + "policy_id" => flow1.policy_id, + "actor_group_membership_id" => flow1.actor_group_membership_id, + "expires_at" => flow1.expires_at } send( @@ -361,7 +369,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => client.id, "resource_id" => resource.id, "account_id" => account.id, - "gateway_id" => gateway.id + "gateway_id" => gateway.id, + "token_id" => flow.token_id, + "actor_group_membership_id" => flow.actor_group_membership_id, + "policy_id" => flow.policy_id, + "expires_at" => flow.expires_at } send( @@ -499,7 +511,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => other_flow1.client_id, "resource_id" => other_flow1.resource_id, "account_id" => other_flow1.account_id, - "gateway_id" => other_flow1.gateway_id + "gateway_id" => other_flow1.gateway_id, + "token_id" => other_flow1.token_id, + "policy_id" => other_flow1.policy_id, + "actor_group_membership_id" => other_flow1.actor_group_membership_id, + "expires_at" => other_flow1.expires_at } Events.Hooks.Flows.on_delete(data) @@ -517,7 +533,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => other_flow2.client_id, "resource_id" => other_flow2.resource_id, "account_id" => other_flow2.account_id, - "gateway_id" => other_flow2.gateway_id + "gateway_id" => other_flow2.gateway_id, + "token_id" => other_flow2.token_id, + "policy_id" => other_flow2.policy_id, + "actor_group_membership_id" => other_flow2.actor_group_membership_id, + "expires_at" => other_flow2.expires_at } Events.Hooks.Flows.on_delete(data) @@ -944,7 +964,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => client.id, "resource_id" => resource.id, "account_id" => account.id, - "gateway_id" => gateway.id + "gateway_id" => gateway.id, + "token_id" => flow.token_id, + "actor_group_membership_id" => flow.actor_group_membership_id, + "policy_id" => flow.policy_id, + "expires_at" => flow.expires_at } Events.Hooks.Flows.on_delete(data) @@ -1074,7 +1098,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => client.id, "resource_id" => resource.id, "account_id" => account.id, - "gateway_id" => gateway.id + "gateway_id" => gateway.id, + "token_id" => flow.token_id, + "actor_group_membership_id" => flow.actor_group_membership_id, + "policy_id" => flow.policy_id, + "expires_at" => flow.expires_at } Events.Hooks.Flows.on_delete(data) diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index 3e240e791..7488ccff6 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -1,6 +1,6 @@ defmodule Domain.Flows do alias Domain.Repo - alias Domain.{Auth, Actors, Clients, Gateways, Resources, Policies} + alias Domain.{Auth, Actors, Clients, Gateways, Resources, Policies, Tokens} alias Domain.Flows.{Authorizer, Flow} require Ecto.Query require Logger @@ -34,6 +34,11 @@ defmodule Domain.Flows do expires_at ) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.create_flows_permission()) do + # Set the expiration time for the authorization. For normal user clients, this is 7 days, defined in Domain.Auth. + # For service accounts using a headless client, it's configurable. This will always be set. + # We always cap this by the subject's expires_at so that we synchronize removal of the flow on the gateway + # with the websocket connection expiration on the client. + flow = Flow.Changeset.create(%{ token_id: token_id, @@ -59,19 +64,28 @@ defmodule Domain.Flows do # This can happen if a Policy was created that grants redundant access to a client that # is already connected to the Resource, then the initial Policy is deleted. # - # We need to create a new flow with the new Policy but the same (or shorter) expiration as - # the old flow. + # We need to create a new flow with the new Policy. Unfortunately getting the expiration + # right is a bit tricky, since we need to synchronize this with the client's state. + # If we expire the flow too early, the client will lose access to the Resource without + # any change in client state. If we expire the flow too late, the client will have access + # to the Resource beyond its intended expiration time. + # + # So, we use the minimum of either the policy condition or the origin flow's expiration time. def reauthorize_flow(%Flow{} = flow) do with {:ok, client} <- Clients.fetch_client_by_id(flow.client_id, preload: :identity), + # TODO: Hard delete + # We need to ensure token and gateway haven't been deleted after the initial flow was created + # This can be removed after hard-delete since we'll get a DB error if these associations no longer exist + {:ok, _token} <- Tokens.fetch_token_by_id(flow.token_id), + {:ok, _gateway} <- Gateways.fetch_gateway_by_id(flow.gateway_id), policies when policies != [] <- - Policies.all_policies_for_resource_id!( + Policies.all_policies_for_resource_id_and_actor_id!( flow.account_id, - flow.resource_id + flow.resource_id, + client.actor_id ), - conforming_policies when conforming_policies != [] <- - Policies.filter_by_conforming_policies_for_client(policies, client), - policy <- Enum.at(conforming_policies, 0), - {:ok, expires_at} <- Policies.ensure_client_conforms_policy_conditions(client, policy), + {:ok, expires_at, policy} <- + Policies.longest_conforming_policy_for_client(policies, client, flow.expires_at), {:ok, membership} <- Actors.fetch_membership_by_actor_id_and_group_id( client.actor_id, @@ -80,7 +94,7 @@ defmodule Domain.Flows do {:ok, new_flow} <- Flow.Changeset.create(%{ token_id: flow.token_id, - policy_id: Enum.at(conforming_policies, 0).id, + policy_id: policy.id, client_id: flow.client_id, gateway_id: flow.gateway_id, resource_id: flow.resource_id, @@ -89,13 +103,16 @@ defmodule Domain.Flows do client_remote_ip: client.last_seen_remote_ip, client_user_agent: client.last_seen_user_agent, gateway_remote_ip: flow.gateway_remote_ip, - expires_at: expires_at || flow.expires_at + expires_at: expires_at }) |> Repo.insert() do {:ok, new_flow} else reason -> - Logger.info("Failed to reauthorize flow: #{inspect(reason)}") + Logger.info("Failed to reauthorize flow", + reason: inspect(reason) + ) + {:error, :forbidden} end end @@ -209,10 +226,15 @@ defmodule Domain.Flows do |> Repo.delete_all() end - def delete_flows_for(%Domain.Tokens.Token{} = token) do + def delete_flows_for(%Domain.Tokens.Token{account_id: nil}) do + # Tokens without an account_id are not associated with any flows. I.e. global relay tokens + {0, []} + end + + def delete_flows_for(%Domain.Tokens.Token{id: id, account_id: account_id}) do Flow.Query.all() - |> Flow.Query.by_account_id(token.account_id) - |> Flow.Query.by_token_id(token.id) + |> Flow.Query.by_account_id(account_id) + |> Flow.Query.by_token_id(id) |> Repo.delete_all() end diff --git a/elixir/apps/domain/lib/domain/gateways.ex b/elixir/apps/domain/lib/domain/gateways.ex index 4dfc1d0ca..209ec0e98 100644 --- a/elixir/apps/domain/lib/domain/gateways.ex +++ b/elixir/apps/domain/lib/domain/gateways.ex @@ -23,6 +23,12 @@ defmodule Domain.Gateways do |> Repo.aggregate(:count) end + def fetch_gateway_by_id(id) do + Gateway.Query.not_deleted() + |> Gateway.Query.by_id(id) + |> Repo.fetch(Gateway.Query, []) + end + def fetch_group_by_id(id, %Auth.Subject{} = subject, opts \\ []) do required_permissions = {:one_of, diff --git a/elixir/apps/domain/lib/domain/policies.ex b/elixir/apps/domain/lib/domain/policies.ex index f1aafe220..92b3580f0 100644 --- a/elixir/apps/domain/lib/domain/policies.ex +++ b/elixir/apps/domain/lib/domain/policies.ex @@ -74,10 +74,11 @@ defmodule Domain.Policies do |> Repo.all() end - def all_policies_for_resource_id!(account_id, resource_id) do + def all_policies_for_resource_id_and_actor_id!(account_id, resource_id, actor_id) do Policy.Query.not_disabled() |> Policy.Query.by_account_id(account_id) |> Policy.Query.by_resource_id(resource_id) + |> Policy.Query.by_actor_id(actor_id) |> Repo.all() end @@ -187,7 +188,6 @@ defmodule Domain.Policies do def filter_by_conforming_policies_for_client(policies, %Clients.Client{} = client) do Enum.filter(policies, fn policy -> policy.conditions - |> Enum.filter(&Condition.Evaluator.evaluable_on_connect?/1) |> Condition.Evaluator.ensure_conforms(client) |> case do {:ok, _expires_at} -> true @@ -196,7 +196,55 @@ defmodule Domain.Policies do end) end - def ensure_client_conforms_policy_conditions(%Clients.Client{} = client, %Policy{} = policy) do + @infinity ~U[9999-12-31 23:59:59.999999Z] + + def longest_conforming_policy_for_client(policies, client, token_expires_at) do + policies + |> Enum.reduce(%{failed: [], succeeded: []}, fn policy, acc -> + case ensure_client_conforms_policy_conditions(client, policy) do + {:ok, expires_at} -> + %{acc | succeeded: [{expires_at, policy} | acc.succeeded]} + + {:error, {:forbidden, violated_properties: violated_properties}} -> + %{acc | failed: acc.failed ++ violated_properties} + end + end) + |> case do + %{succeeded: [], failed: failed} -> + {:error, {:forbidden, violated_properties: Enum.uniq(failed)}} + + %{succeeded: succeeded} -> + {expires_at, policy} = + succeeded + |> Enum.max_by(fn {expires_at, _policy} -> expires_at || @infinity end) + + {:ok, min_expires_at(expires_at, token_expires_at), policy} + end + end + + # All client tokens have *some* expiration + def min_expires_at(nil, nil), + do: raise("Both policy_expires_at and token_expires_at cannot be nil") + + def min_expires_at(nil, token_expires_at), do: token_expires_at + + def min_expires_at(%DateTime{} = policy_expires_at, %DateTime{} = token_expires_at) do + if DateTime.compare(policy_expires_at, token_expires_at) == :lt do + policy_expires_at + else + token_expires_at + end + end + + defp ensure_has_access_to(%Auth.Subject{} = subject, %Policy{} = policy) do + if subject.account.id == policy.account_id do + :ok + else + {:error, :unauthorized} + end + end + + defp ensure_client_conforms_policy_conditions(%Clients.Client{} = client, %Policy{} = policy) do case Condition.Evaluator.ensure_conforms(policy.conditions, client) do {:ok, expires_at} -> {:ok, expires_at} @@ -205,12 +253,4 @@ defmodule Domain.Policies do {:error, {:forbidden, violated_properties: violated_properties}} end end - - def ensure_has_access_to(%Auth.Subject{} = subject, %Policy{} = policy) do - if subject.account.id == policy.account_id do - :ok - else - {:error, :unauthorized} - end - end end diff --git a/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex b/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex index 364ef145e..d14ec6e97 100644 --- a/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex +++ b/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex @@ -4,13 +4,6 @@ defmodule Domain.Policies.Condition.Evaluator do @days_of_week ~w[M T W R F S U] - @doc """ - Returns `true` if the condition can be evaluated during connection (eg. for IP address matching - it can't change while socket is open), otherwise `false`. - """ - def evaluable_on_connect?(%Condition{property: "current_utc_datetime"}), do: false - def evaluable_on_connect?(_), do: true - def ensure_conforms([], %Clients.Client{}) do {:ok, nil} end diff --git a/elixir/apps/domain/lib/domain/pubsub.ex b/elixir/apps/domain/lib/domain/pubsub.ex index 0e50af306..721d5b31e 100644 --- a/elixir/apps/domain/lib/domain/pubsub.ex +++ b/elixir/apps/domain/lib/domain/pubsub.ex @@ -5,6 +5,8 @@ defmodule Domain.PubSub do """ use Supervisor + require Logger + def start_link(opts) do Supervisor.start_link(__MODULE__, opts) end @@ -49,6 +51,12 @@ defmodule Domain.PubSub do |> Domain.PubSub.subscribe() end + def broadcast(nil, payload) do + Logger.warning("Broadcasting to nil account_id is not allowed", + payload: inspect(payload) + ) + end + def broadcast(account_id, payload) do account_id |> topic() diff --git a/elixir/apps/domain/lib/domain/tokens.ex b/elixir/apps/domain/lib/domain/tokens.ex index a3961339a..f4327adf3 100644 --- a/elixir/apps/domain/lib/domain/tokens.ex +++ b/elixir/apps/domain/lib/domain/tokens.ex @@ -19,6 +19,13 @@ defmodule Domain.Tokens do Supervisor.init(children, strategy: :one_for_one) end + def fetch_token_by_id(id) do + Token.Query.not_deleted() + |> Token.Query.not_expired() + |> Token.Query.by_id(id) + |> Repo.fetch(Token.Query, []) + end + def fetch_token_by_id(id, %Auth.Subject{} = subject, opts \\ []) do required_permissions = {:one_of, diff --git a/elixir/apps/domain/test/domain/policies_test.exs b/elixir/apps/domain/test/domain/policies_test.exs index 7a55c7d75..9738934cd 100644 --- a/elixir/apps/domain/test/domain/policies_test.exs +++ b/elixir/apps/domain/test/domain/policies_test.exs @@ -6,13 +6,23 @@ defmodule Domain.PoliciesTest do setup do account = Fixtures.Accounts.create_account() actor = Fixtures.Actors.create_actor(type: :account_admin_user, account: account) + actor_group = Fixtures.Actors.create_group(account: account) identity = Fixtures.Auth.create_identity(account: account, actor: actor) subject = Fixtures.Auth.create_subject(identity: identity) + resource = Fixtures.Resources.create_resource(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group + ) %{ account: account, actor: actor, + actor_group: actor_group, identity: identity, + resource: resource, subject: subject } end @@ -704,10 +714,7 @@ defmodule Domain.PoliciesTest do end describe "delete_policies_for/1" do - setup %{account: account, subject: subject} do - resource = Fixtures.Resources.create_resource(account: account) - actor_group = Fixtures.Actors.create_group(account: account) - + setup %{resource: resource, actor_group: actor_group, account: account, subject: subject} do policy = Fixtures.Policies.create_policy( account: account, @@ -843,42 +850,365 @@ defmodule Domain.PoliciesTest do end end - describe "ensure_client_conforms_policy_conditions/2" do - test "returns :ok when client conforms to policy conditions", %{} do - client = %Domain.Clients.Client{ - last_seen_remote_ip_location_region: "US" - } - - policy = %Policies.Policy{ - conditions: [ - %Policies.Condition{ - property: :remote_ip_location_region, - operator: :is_in, - values: ["US"] - } - ] - } - - assert ensure_client_conforms_policy_conditions(client, policy) == {:ok, nil} + describe "filter_by_conforming_policies_for_client/2" do + test "returns empty list when there are no policies", %{} do + client = Fixtures.Clients.create_client() + assert filter_by_conforming_policies_for_client([], client) == [] end - test "returns error when client conforms to policy conditions", %{} do - client = %Domain.Clients.Client{ - last_seen_remote_ip_location_region: "US" - } + test "filters based on policy conditions", %{ + account: account, + resource: resource, + actor: actor, + actor_group: actor_group + } do + client = Fixtures.Clients.create_client(account: account, actor: actor) + actor_group2 = Fixtures.Actors.create_group(account: account) - policy = %Policies.Policy{ - conditions: [ - %Policies.Condition{ - property: :remote_ip_location_region, - operator: :is_in, - values: ["CA"] - } - ] - } + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group2 + ) - assert ensure_client_conforms_policy_conditions(client, policy) == - {:error, {:forbidden, [violated_properties: [:remote_ip_location_region]]}} + actor_group3 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group3 + ) + + actor_group4 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group4 + ) + + actor_group5 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group5 + ) + + actor_group6 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group6 + ) + + policy1 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["CA"] + } + ] + ) + + policy2 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group2, + conditions: [ + %{ + property: :remote_ip, + operator: :is_in_cidr, + values: ["1.1.1.1/32"] + } + ] + ) + + policy3 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group3, + conditions: [ + %{ + property: :provider_id, + operator: :is_in, + values: [Ecto.UUID.generate()] + } + ] + ) + + policy4 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group4, + conditions: [ + %{ + property: :current_utc_datetime, + operator: :is_in_day_of_week_time_ranges, + values: ["M/13:00:00-13:00:00/UTC"] + } + ] + ) + + policy5 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group5, + conditions: [ + %{ + property: :client_verified, + operator: :is, + values: ["true"] + } + ] + ) + + policy6 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group6, + conditions: [] + ) + + assert filter_by_conforming_policies_for_client( + [policy1, policy2, policy3, policy4, policy5, policy6], + client + ) == [policy6] + end + end + + describe "longest_conforming_policy_for_client/3" do + test "returns forbidden when there are no matching policies", %{ + account: account, + actor: actor, + subject: subject + } do + client = Fixtures.Clients.create_client(account: account, actor: actor) + + assert longest_conforming_policy_for_client([], client, subject.expires_at) == + {:error, {:forbidden, violated_properties: []}} + end + + test "accumulates and uniqs violated properties", %{ + account: account, + actor: actor, + actor_group: actor_group, + resource: resource, + subject: subject + } do + client = Fixtures.Clients.create_client(account: account, actor: actor) + + policy = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["MX"] + }, + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["BR"] + }, + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["TF"] + }, + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["IT"] + } + ] + ) + + actor_group2 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group2 + ) + + actor_group3 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group3 + ) + + actor_group4 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group4 + ) + + policy2 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group2, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["CA"] + } + ] + ) + + policy3 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group3, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["RU"] + } + ] + ) + + policy4 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group4, + conditions: [ + %{ + property: :current_utc_datetime, + operator: :is_in_day_of_week_time_ranges, + values: ["M/13:00:00-13:00:00/UTC"] + } + ] + ) + + assert {:error, {:forbidden, violated_properties: violated_properties}} = + longest_conforming_policy_for_client( + [policy, policy2, policy3, policy4], + client, + subject.expires_at + ) + + assert Enum.sort(violated_properties) == + Enum.sort([ + :remote_ip_location_region, + :current_utc_datetime + ]) + end + + test "returns expiration of longest conforming policy when less than passed default", %{ + account: account, + actor: actor, + actor_group: actor_group, + resource: resource + } do + # Build relative time ranges based on current time + now = DateTime.utc_now() + + # Get tomorrow as a letter - allows for consistent testing + day_letter = + case Date.day_of_week(now) do + # Monday + 1 -> "M" + # Tuesday + 2 -> "T" + # Wednesday + 3 -> "W" + # Thursday + 4 -> "R" + # Friday + 5 -> "F" + # Saturday + 6 -> "S" + # Sunday + 7 -> "U" + end + + time_range_1 = "#{day_letter}/00:00:00-23:59:59/UTC" + time_range_2 = "#{day_letter}/00:00:00-23:59:58/UTC" + + # Policy2: 1-hour window (shorter policy) + + client = Fixtures.Clients.create_client(account: account, actor: actor) + actor_group2 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group2 + ) + + policy1 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group, + conditions: [ + %{ + property: :current_utc_datetime, + operator: :is_in_day_of_week_time_ranges, + values: [time_range_1] + } + ] + ) + + policy2 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group2, + conditions: [ + %{ + property: :current_utc_datetime, + operator: :is_in_day_of_week_time_ranges, + values: [time_range_2] + } + ] + ) + + in_three_days = DateTime.utc_now() |> DateTime.add(3, :day) + + assert {:ok, expires_at, ^policy1} = + longest_conforming_policy_for_client( + [policy1, policy2], + client, + in_three_days + ) + + assert DateTime.compare(expires_at, in_three_days) == :lt + + in_one_minute = DateTime.utc_now() |> DateTime.add(1, :minute) + + assert {:ok, expires_at, ^policy1} = + longest_conforming_policy_for_client( + [policy1, policy2], + client, + in_one_minute + ) + + assert DateTime.compare(expires_at, in_one_minute) == :eq end end end