diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index ea0e4fc13..e2b3aed60 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -12,8 +12,7 @@ defmodule API.Client.Channel do Gateways, Relays, Policies, - Flows, - Tokens + Flows } alias Domain.Relays.Presence.Debouncer @@ -40,8 +39,7 @@ defmodule API.Client.Channel do @impl true def join("client", _payload, socket) do - with {:ok, socket} <- schedule_expiration(socket), - {:ok, gateway_version_requirement} <- + with {:ok, gateway_version_requirement} <- select_gateway_version_requirement(socket.assigns.client) do socket = assign(socket, gateway_version_requirement: gateway_version_requirement) @@ -51,19 +49,6 @@ defmodule API.Client.Channel do end end - defp schedule_expiration(%{assigns: %{subject: %{expires_at: expires_at}}} = socket) do - expires_in = - expires_at - |> DateTime.diff(DateTime.utc_now(), :millisecond) - - if expires_in > 0 do - Process.send_after(self(), :token_expired, expires_in) - {:ok, socket} - else - {:error, %{reason: :token_expired}} - end - end - @impl true # Called immediately after the client joins the channel @@ -542,26 +527,6 @@ defmodule API.Client.Channel do end end - # TOKENS - - def handle_info( - {:deleted, %Tokens.Token{type: :client, id: id}}, - %{assigns: %{subject: %{token_id: token_id}}} = socket - ) - when id == token_id do - disconnect(socket) - end - - #################################### - ##### Reacting to timed events ##### - #################################### - - # Message is scheduled by schedule_expiration/1 on topic join to be sent - # when the client token/subject expires - def handle_info(:token_expired, socket) do - disconnect(socket) - end - #################################### #### Reacting to relay presence #### #################################### @@ -758,7 +723,7 @@ defmodule API.Client.Channel do } with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id), - {:ok, policy, expires_at} <- authorize_resource(socket, resource_id), + {:ok, expires_at, policy} <- authorize_resource(socket, resource_id), {:ok, gateways} when gateways != [] <- Gateways.all_connected_gateways_for_resource(resource, socket.assigns.subject, preload: :group @@ -863,7 +828,7 @@ defmodule API.Client.Channel do # TODO: Optimization # Gateway selection and flow authorization shouldn't need to hit the DB with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id), - {:ok, _policy, _expires_at} <- authorize_resource(socket, resource_id), + {:ok, _expires_at, _policy} <- authorize_resource(socket, resource_id), {:ok, [_ | _] = gateways} <- Gateways.all_connected_gateways_for_resource(resource, socket.assigns.subject, preload: :group @@ -920,7 +885,7 @@ defmodule API.Client.Channel do socket ) do with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id), - {:ok, policy, expires_at} <- authorize_resource(socket, resource_id), + {:ok, expires_at, policy} <- authorize_resource(socket, resource_id), {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do # TODO: Optimization @@ -980,7 +945,7 @@ defmodule API.Client.Channel do ) do # Flow authorization can happen out-of-band since we just authorized the resource above with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id), - {:ok, policy, expires_at} <- authorize_resource(socket, resource_id), + {:ok, expires_at, policy} <- authorize_resource(socket, resource_id), {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do # TODO: Optimization @@ -1285,7 +1250,7 @@ defmodule API.Client.Channel do end end - # Returns either the authorized policy or an error tuple of violated properties + # Returns either the longest authorized policy or an error tuple of violated properties defp authorize_resource(socket, resource_id) do OpenTelemetry.Tracer.with_span "client.authorize_resource", attributes: %{ @@ -1294,27 +1259,10 @@ defmodule API.Client.Channel do socket.assigns.policies |> Enum.filter(fn {_id, policy} -> policy.resource_id == resource_id end) |> Enum.map(fn {_id, policy} -> policy end) - |> Enum.reduce_while({:error, []}, fn policy, {:error, acc} -> - case Policies.ensure_client_conforms_policy_conditions(socket.assigns.client, policy) do - {:ok, expires_at} -> - {:halt, {:ok, policy, expires_at}} - - {:error, {:forbidden, violated_properties: violated_properties}} -> - {:cont, {:error, violated_properties ++ acc}} - end - end) - |> case do - {:error, violated_properties} -> - {:error, {:forbidden, violated_properties: violated_properties}} - - {:ok, policy, expires_at} -> - # Set a maximum expiration time for the authorization - expires_at = - expires_at || socket.assigns.subject.expires_at || - DateTime.utc_now() |> DateTime.add(14, :day) - - {:ok, policy, expires_at} - end + |> Policies.longest_conforming_policy_for_client( + socket.assigns.client, + socket.assigns.subject.expires_at + ) end end end diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index f1530956e..8e350e35e 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Channel do use API, :channel alias API.Gateway.Views - alias Domain.{Accounts, Flows, Gateways, PubSub, Relays, Resources, Tokens} + alias Domain.{Accounts, Flows, Gateways, PubSub, Relays, Resources} alias Domain.Relays.Presence.Debouncer require Logger require OpenTelemetry.Tracer @@ -110,38 +110,67 @@ defmodule API.Gateway.Channel do when gateway_id == id do tuple = {flow.client_id, flow.resource_id} - socket = - if flows_map = Map.get(socket.assigns.flows, tuple) do - flow_map = Map.delete(flows_map, flow.id) + if flows_map = Map.get(socket.assigns.flows, tuple) do + flow_map = Map.delete(flows_map, flow.id) + remaining = map_size(flow_map) - with 0 <- map_size(flow_map), - {:ok, new_flow} <- Flows.reauthorize_flow(flow) do - flow_map = %{ - new_flow.id => new_flow.expires_at - } + if remaining == 0 do + case Flows.reauthorize_flow(flow) do + {:ok, new_flow} -> + flow_map = %{ + new_flow.id => new_flow.expires_at + } - push( - socket, - "access_authorization_expiry_updated", - Views.Flow.render(new_flow, new_flow.expires_at) - ) + Logger.info("Updated flow authorization", + old_flow_id: flow.id, + account_id: flow.account_id, + client_id: flow.client_id, + resource_id: flow.resource_id, + new_flow_id: new_flow.id + ) + + push( + socket, + "access_authorization_expiry_updated", + Views.Flow.render(new_flow, new_flow.expires_at) + ) + + {:noreply, assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))} + + {:error, :forbidden} -> + Logger.info("Last flow deleted; revoking access", + flow_id: flow.id, + account_id: flow.account_id, + client_id: flow.client_id, + resource_id: flow.resource_id + ) - assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map)) - else - _ -> # Send reject_access if access is no longer granted + + # TODO: Verify that if the client's websocket connection flaps at the moment this + # message is received by the gateway, and the client still has access to this resource, + # the client will clear its state so it can request a new flow. push(socket, "reject_access", %{ client_id: flow.client_id, resource_id: flow.resource_id }) - assign(socket, flows: Map.delete(socket.assigns.flows, tuple)) + {:noreply, assign(socket, flows: Map.delete(socket.assigns.flows, tuple))} end else - socket - end + Logger.info("Flow deleted but still has remaining flows for client/resource", + flow_id: flow.id, + account_id: flow.account_id, + client_id: flow.client_id, + resource_id: flow.resource_id, + remaining: remaining + ) - {:noreply, socket} + {:noreply, assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))} + end + else + {:noreply, socket} + end end # RESOURCES @@ -165,14 +194,6 @@ defmodule API.Gateway.Channel do {:noreply, socket} end - # TOKENS - - # Our gateway token was deleted - disconnect WebSocket - def handle_info({:deleted, %Tokens.Token{type: :gateway_group, id: id}}, socket) - when id == socket.assigns.token_id do - disconnect(socket) - end - #################################### #### Reacting to relay presence #### #################################### @@ -650,10 +671,4 @@ defmodule API.Gateway.Channel do assign(socket, flows: flows) end end - - defp disconnect(socket) do - push(socket, "disconnect", %{"reason" => "token_expired"}) - send(socket.transport_pid, %Phoenix.Socket.Broadcast{event: "disconnect"}) - {:stop, :shutdown, socket} - end end diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index aa0b64a6f..1370e6525 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -200,14 +200,16 @@ defmodule API.Client.ChannelTest do refute_receive {:socket_close, _pid, _} end - test "expires the channel when token is expired", %{client: client, subject: subject} do - expires_at = DateTime.utc_now() |> DateTime.add(25, :millisecond) - subject = %{subject | expires_at: expires_at} - + test "send disconnect broadcast when the token is deleted", %{ + client: client, + subject: subject + } do # We need to trap exits to avoid test process termination # because it is linked to the created test channel process Process.flag(:trap_exit, true) + :ok = Domain.PubSub.subscribe("sessions:#{subject.token_id}") + {:ok, _reply, _socket} = API.Client.Socket |> socket("client:#{client.id}", %{ @@ -216,9 +218,23 @@ defmodule API.Client.ChannelTest do }) |> subscribe_and_join(API.Client.Channel, "client") - assert_push "disconnect", %{reason: :token_expired}, 250 - assert_receive {:EXIT, _pid, :shutdown} - assert_receive {:socket_close, _pid, :shutdown} + token = Repo.get_by(Domain.Tokens.Token, id: subject.token_id) + + data = %{ + "id" => token.id, + "account_id" => token.account_id, + "type" => token.type, + "expires_at" => token.expires_at + } + + Domain.Events.Hooks.Tokens.on_delete(data) + + assert_receive %Phoenix.Socket.Broadcast{ + topic: topic, + event: "disconnect" + } + + assert topic == "sessions:#{token.id}" end test "selects compatible gateway versions", %{client: client, subject: subject} do diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 0e441c3ad..f6aaedfd1 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -126,6 +126,7 @@ defmodule API.Gateway.ChannelTest do Process.flag(:trap_exit, true) :ok = Domain.PubSub.Account.subscribe(account.id) + :ok = Domain.PubSub.subscribe("sessions:#{token.id}") data = %{ "id" => token.id, @@ -136,11 +137,14 @@ defmodule API.Gateway.ChannelTest do Events.Hooks.Tokens.on_delete(data) assert_receive {:deleted, deleted_token} - assert_push "disconnect", payload - assert_receive {:EXIT, _pid, _} - assert_receive {:socket_close, _pid, _} + + assert_receive %Phoenix.Socket.Broadcast{ + topic: topic, + event: "disconnect" + } + + assert topic == "sessions:#{token.id}" assert deleted_token.id == token.id - assert payload == %{"reason" => "token_expired"} end test "pushes allow_access message", %{ @@ -294,7 +298,11 @@ defmodule API.Gateway.ChannelTest do "id" => flow1.id, "client_id" => client.id, "resource_id" => resource.id, - "account_id" => account.id + "account_id" => account.id, + "token_id" => flow1.token_id, + "policy_id" => flow1.policy_id, + "actor_group_membership_id" => flow1.actor_group_membership_id, + "expires_at" => flow1.expires_at } send( @@ -361,7 +369,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => client.id, "resource_id" => resource.id, "account_id" => account.id, - "gateway_id" => gateway.id + "gateway_id" => gateway.id, + "token_id" => flow.token_id, + "actor_group_membership_id" => flow.actor_group_membership_id, + "policy_id" => flow.policy_id, + "expires_at" => flow.expires_at } send( @@ -499,7 +511,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => other_flow1.client_id, "resource_id" => other_flow1.resource_id, "account_id" => other_flow1.account_id, - "gateway_id" => other_flow1.gateway_id + "gateway_id" => other_flow1.gateway_id, + "token_id" => other_flow1.token_id, + "policy_id" => other_flow1.policy_id, + "actor_group_membership_id" => other_flow1.actor_group_membership_id, + "expires_at" => other_flow1.expires_at } Events.Hooks.Flows.on_delete(data) @@ -517,7 +533,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => other_flow2.client_id, "resource_id" => other_flow2.resource_id, "account_id" => other_flow2.account_id, - "gateway_id" => other_flow2.gateway_id + "gateway_id" => other_flow2.gateway_id, + "token_id" => other_flow2.token_id, + "policy_id" => other_flow2.policy_id, + "actor_group_membership_id" => other_flow2.actor_group_membership_id, + "expires_at" => other_flow2.expires_at } Events.Hooks.Flows.on_delete(data) @@ -944,7 +964,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => client.id, "resource_id" => resource.id, "account_id" => account.id, - "gateway_id" => gateway.id + "gateway_id" => gateway.id, + "token_id" => flow.token_id, + "actor_group_membership_id" => flow.actor_group_membership_id, + "policy_id" => flow.policy_id, + "expires_at" => flow.expires_at } Events.Hooks.Flows.on_delete(data) @@ -1074,7 +1098,11 @@ defmodule API.Gateway.ChannelTest do "client_id" => client.id, "resource_id" => resource.id, "account_id" => account.id, - "gateway_id" => gateway.id + "gateway_id" => gateway.id, + "token_id" => flow.token_id, + "actor_group_membership_id" => flow.actor_group_membership_id, + "policy_id" => flow.policy_id, + "expires_at" => flow.expires_at } Events.Hooks.Flows.on_delete(data) diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index 3e240e791..7488ccff6 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -1,6 +1,6 @@ defmodule Domain.Flows do alias Domain.Repo - alias Domain.{Auth, Actors, Clients, Gateways, Resources, Policies} + alias Domain.{Auth, Actors, Clients, Gateways, Resources, Policies, Tokens} alias Domain.Flows.{Authorizer, Flow} require Ecto.Query require Logger @@ -34,6 +34,11 @@ defmodule Domain.Flows do expires_at ) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.create_flows_permission()) do + # Set the expiration time for the authorization. For normal user clients, this is 7 days, defined in Domain.Auth. + # For service accounts using a headless client, it's configurable. This will always be set. + # We always cap this by the subject's expires_at so that we synchronize removal of the flow on the gateway + # with the websocket connection expiration on the client. + flow = Flow.Changeset.create(%{ token_id: token_id, @@ -59,19 +64,28 @@ defmodule Domain.Flows do # This can happen if a Policy was created that grants redundant access to a client that # is already connected to the Resource, then the initial Policy is deleted. # - # We need to create a new flow with the new Policy but the same (or shorter) expiration as - # the old flow. + # We need to create a new flow with the new Policy. Unfortunately getting the expiration + # right is a bit tricky, since we need to synchronize this with the client's state. + # If we expire the flow too early, the client will lose access to the Resource without + # any change in client state. If we expire the flow too late, the client will have access + # to the Resource beyond its intended expiration time. + # + # So, we use the minimum of either the policy condition or the origin flow's expiration time. def reauthorize_flow(%Flow{} = flow) do with {:ok, client} <- Clients.fetch_client_by_id(flow.client_id, preload: :identity), + # TODO: Hard delete + # We need to ensure token and gateway haven't been deleted after the initial flow was created + # This can be removed after hard-delete since we'll get a DB error if these associations no longer exist + {:ok, _token} <- Tokens.fetch_token_by_id(flow.token_id), + {:ok, _gateway} <- Gateways.fetch_gateway_by_id(flow.gateway_id), policies when policies != [] <- - Policies.all_policies_for_resource_id!( + Policies.all_policies_for_resource_id_and_actor_id!( flow.account_id, - flow.resource_id + flow.resource_id, + client.actor_id ), - conforming_policies when conforming_policies != [] <- - Policies.filter_by_conforming_policies_for_client(policies, client), - policy <- Enum.at(conforming_policies, 0), - {:ok, expires_at} <- Policies.ensure_client_conforms_policy_conditions(client, policy), + {:ok, expires_at, policy} <- + Policies.longest_conforming_policy_for_client(policies, client, flow.expires_at), {:ok, membership} <- Actors.fetch_membership_by_actor_id_and_group_id( client.actor_id, @@ -80,7 +94,7 @@ defmodule Domain.Flows do {:ok, new_flow} <- Flow.Changeset.create(%{ token_id: flow.token_id, - policy_id: Enum.at(conforming_policies, 0).id, + policy_id: policy.id, client_id: flow.client_id, gateway_id: flow.gateway_id, resource_id: flow.resource_id, @@ -89,13 +103,16 @@ defmodule Domain.Flows do client_remote_ip: client.last_seen_remote_ip, client_user_agent: client.last_seen_user_agent, gateway_remote_ip: flow.gateway_remote_ip, - expires_at: expires_at || flow.expires_at + expires_at: expires_at }) |> Repo.insert() do {:ok, new_flow} else reason -> - Logger.info("Failed to reauthorize flow: #{inspect(reason)}") + Logger.info("Failed to reauthorize flow", + reason: inspect(reason) + ) + {:error, :forbidden} end end @@ -209,10 +226,15 @@ defmodule Domain.Flows do |> Repo.delete_all() end - def delete_flows_for(%Domain.Tokens.Token{} = token) do + def delete_flows_for(%Domain.Tokens.Token{account_id: nil}) do + # Tokens without an account_id are not associated with any flows. I.e. global relay tokens + {0, []} + end + + def delete_flows_for(%Domain.Tokens.Token{id: id, account_id: account_id}) do Flow.Query.all() - |> Flow.Query.by_account_id(token.account_id) - |> Flow.Query.by_token_id(token.id) + |> Flow.Query.by_account_id(account_id) + |> Flow.Query.by_token_id(id) |> Repo.delete_all() end diff --git a/elixir/apps/domain/lib/domain/gateways.ex b/elixir/apps/domain/lib/domain/gateways.ex index 4dfc1d0ca..209ec0e98 100644 --- a/elixir/apps/domain/lib/domain/gateways.ex +++ b/elixir/apps/domain/lib/domain/gateways.ex @@ -23,6 +23,12 @@ defmodule Domain.Gateways do |> Repo.aggregate(:count) end + def fetch_gateway_by_id(id) do + Gateway.Query.not_deleted() + |> Gateway.Query.by_id(id) + |> Repo.fetch(Gateway.Query, []) + end + def fetch_group_by_id(id, %Auth.Subject{} = subject, opts \\ []) do required_permissions = {:one_of, diff --git a/elixir/apps/domain/lib/domain/policies.ex b/elixir/apps/domain/lib/domain/policies.ex index f1aafe220..92b3580f0 100644 --- a/elixir/apps/domain/lib/domain/policies.ex +++ b/elixir/apps/domain/lib/domain/policies.ex @@ -74,10 +74,11 @@ defmodule Domain.Policies do |> Repo.all() end - def all_policies_for_resource_id!(account_id, resource_id) do + def all_policies_for_resource_id_and_actor_id!(account_id, resource_id, actor_id) do Policy.Query.not_disabled() |> Policy.Query.by_account_id(account_id) |> Policy.Query.by_resource_id(resource_id) + |> Policy.Query.by_actor_id(actor_id) |> Repo.all() end @@ -187,7 +188,6 @@ defmodule Domain.Policies do def filter_by_conforming_policies_for_client(policies, %Clients.Client{} = client) do Enum.filter(policies, fn policy -> policy.conditions - |> Enum.filter(&Condition.Evaluator.evaluable_on_connect?/1) |> Condition.Evaluator.ensure_conforms(client) |> case do {:ok, _expires_at} -> true @@ -196,7 +196,55 @@ defmodule Domain.Policies do end) end - def ensure_client_conforms_policy_conditions(%Clients.Client{} = client, %Policy{} = policy) do + @infinity ~U[9999-12-31 23:59:59.999999Z] + + def longest_conforming_policy_for_client(policies, client, token_expires_at) do + policies + |> Enum.reduce(%{failed: [], succeeded: []}, fn policy, acc -> + case ensure_client_conforms_policy_conditions(client, policy) do + {:ok, expires_at} -> + %{acc | succeeded: [{expires_at, policy} | acc.succeeded]} + + {:error, {:forbidden, violated_properties: violated_properties}} -> + %{acc | failed: acc.failed ++ violated_properties} + end + end) + |> case do + %{succeeded: [], failed: failed} -> + {:error, {:forbidden, violated_properties: Enum.uniq(failed)}} + + %{succeeded: succeeded} -> + {expires_at, policy} = + succeeded + |> Enum.max_by(fn {expires_at, _policy} -> expires_at || @infinity end) + + {:ok, min_expires_at(expires_at, token_expires_at), policy} + end + end + + # All client tokens have *some* expiration + def min_expires_at(nil, nil), + do: raise("Both policy_expires_at and token_expires_at cannot be nil") + + def min_expires_at(nil, token_expires_at), do: token_expires_at + + def min_expires_at(%DateTime{} = policy_expires_at, %DateTime{} = token_expires_at) do + if DateTime.compare(policy_expires_at, token_expires_at) == :lt do + policy_expires_at + else + token_expires_at + end + end + + defp ensure_has_access_to(%Auth.Subject{} = subject, %Policy{} = policy) do + if subject.account.id == policy.account_id do + :ok + else + {:error, :unauthorized} + end + end + + defp ensure_client_conforms_policy_conditions(%Clients.Client{} = client, %Policy{} = policy) do case Condition.Evaluator.ensure_conforms(policy.conditions, client) do {:ok, expires_at} -> {:ok, expires_at} @@ -205,12 +253,4 @@ defmodule Domain.Policies do {:error, {:forbidden, violated_properties: violated_properties}} end end - - def ensure_has_access_to(%Auth.Subject{} = subject, %Policy{} = policy) do - if subject.account.id == policy.account_id do - :ok - else - {:error, :unauthorized} - end - end end diff --git a/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex b/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex index 364ef145e..d14ec6e97 100644 --- a/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex +++ b/elixir/apps/domain/lib/domain/policies/condition/evaluator.ex @@ -4,13 +4,6 @@ defmodule Domain.Policies.Condition.Evaluator do @days_of_week ~w[M T W R F S U] - @doc """ - Returns `true` if the condition can be evaluated during connection (eg. for IP address matching - it can't change while socket is open), otherwise `false`. - """ - def evaluable_on_connect?(%Condition{property: "current_utc_datetime"}), do: false - def evaluable_on_connect?(_), do: true - def ensure_conforms([], %Clients.Client{}) do {:ok, nil} end diff --git a/elixir/apps/domain/lib/domain/pubsub.ex b/elixir/apps/domain/lib/domain/pubsub.ex index 0e50af306..721d5b31e 100644 --- a/elixir/apps/domain/lib/domain/pubsub.ex +++ b/elixir/apps/domain/lib/domain/pubsub.ex @@ -5,6 +5,8 @@ defmodule Domain.PubSub do """ use Supervisor + require Logger + def start_link(opts) do Supervisor.start_link(__MODULE__, opts) end @@ -49,6 +51,12 @@ defmodule Domain.PubSub do |> Domain.PubSub.subscribe() end + def broadcast(nil, payload) do + Logger.warning("Broadcasting to nil account_id is not allowed", + payload: inspect(payload) + ) + end + def broadcast(account_id, payload) do account_id |> topic() diff --git a/elixir/apps/domain/lib/domain/tokens.ex b/elixir/apps/domain/lib/domain/tokens.ex index a3961339a..f4327adf3 100644 --- a/elixir/apps/domain/lib/domain/tokens.ex +++ b/elixir/apps/domain/lib/domain/tokens.ex @@ -19,6 +19,13 @@ defmodule Domain.Tokens do Supervisor.init(children, strategy: :one_for_one) end + def fetch_token_by_id(id) do + Token.Query.not_deleted() + |> Token.Query.not_expired() + |> Token.Query.by_id(id) + |> Repo.fetch(Token.Query, []) + end + def fetch_token_by_id(id, %Auth.Subject{} = subject, opts \\ []) do required_permissions = {:one_of, diff --git a/elixir/apps/domain/test/domain/policies_test.exs b/elixir/apps/domain/test/domain/policies_test.exs index 7a55c7d75..9738934cd 100644 --- a/elixir/apps/domain/test/domain/policies_test.exs +++ b/elixir/apps/domain/test/domain/policies_test.exs @@ -6,13 +6,23 @@ defmodule Domain.PoliciesTest do setup do account = Fixtures.Accounts.create_account() actor = Fixtures.Actors.create_actor(type: :account_admin_user, account: account) + actor_group = Fixtures.Actors.create_group(account: account) identity = Fixtures.Auth.create_identity(account: account, actor: actor) subject = Fixtures.Auth.create_subject(identity: identity) + resource = Fixtures.Resources.create_resource(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group + ) %{ account: account, actor: actor, + actor_group: actor_group, identity: identity, + resource: resource, subject: subject } end @@ -704,10 +714,7 @@ defmodule Domain.PoliciesTest do end describe "delete_policies_for/1" do - setup %{account: account, subject: subject} do - resource = Fixtures.Resources.create_resource(account: account) - actor_group = Fixtures.Actors.create_group(account: account) - + setup %{resource: resource, actor_group: actor_group, account: account, subject: subject} do policy = Fixtures.Policies.create_policy( account: account, @@ -843,42 +850,365 @@ defmodule Domain.PoliciesTest do end end - describe "ensure_client_conforms_policy_conditions/2" do - test "returns :ok when client conforms to policy conditions", %{} do - client = %Domain.Clients.Client{ - last_seen_remote_ip_location_region: "US" - } - - policy = %Policies.Policy{ - conditions: [ - %Policies.Condition{ - property: :remote_ip_location_region, - operator: :is_in, - values: ["US"] - } - ] - } - - assert ensure_client_conforms_policy_conditions(client, policy) == {:ok, nil} + describe "filter_by_conforming_policies_for_client/2" do + test "returns empty list when there are no policies", %{} do + client = Fixtures.Clients.create_client() + assert filter_by_conforming_policies_for_client([], client) == [] end - test "returns error when client conforms to policy conditions", %{} do - client = %Domain.Clients.Client{ - last_seen_remote_ip_location_region: "US" - } + test "filters based on policy conditions", %{ + account: account, + resource: resource, + actor: actor, + actor_group: actor_group + } do + client = Fixtures.Clients.create_client(account: account, actor: actor) + actor_group2 = Fixtures.Actors.create_group(account: account) - policy = %Policies.Policy{ - conditions: [ - %Policies.Condition{ - property: :remote_ip_location_region, - operator: :is_in, - values: ["CA"] - } - ] - } + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group2 + ) - assert ensure_client_conforms_policy_conditions(client, policy) == - {:error, {:forbidden, [violated_properties: [:remote_ip_location_region]]}} + actor_group3 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group3 + ) + + actor_group4 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group4 + ) + + actor_group5 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group5 + ) + + actor_group6 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group6 + ) + + policy1 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["CA"] + } + ] + ) + + policy2 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group2, + conditions: [ + %{ + property: :remote_ip, + operator: :is_in_cidr, + values: ["1.1.1.1/32"] + } + ] + ) + + policy3 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group3, + conditions: [ + %{ + property: :provider_id, + operator: :is_in, + values: [Ecto.UUID.generate()] + } + ] + ) + + policy4 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group4, + conditions: [ + %{ + property: :current_utc_datetime, + operator: :is_in_day_of_week_time_ranges, + values: ["M/13:00:00-13:00:00/UTC"] + } + ] + ) + + policy5 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group5, + conditions: [ + %{ + property: :client_verified, + operator: :is, + values: ["true"] + } + ] + ) + + policy6 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group6, + conditions: [] + ) + + assert filter_by_conforming_policies_for_client( + [policy1, policy2, policy3, policy4, policy5, policy6], + client + ) == [policy6] + end + end + + describe "longest_conforming_policy_for_client/3" do + test "returns forbidden when there are no matching policies", %{ + account: account, + actor: actor, + subject: subject + } do + client = Fixtures.Clients.create_client(account: account, actor: actor) + + assert longest_conforming_policy_for_client([], client, subject.expires_at) == + {:error, {:forbidden, violated_properties: []}} + end + + test "accumulates and uniqs violated properties", %{ + account: account, + actor: actor, + actor_group: actor_group, + resource: resource, + subject: subject + } do + client = Fixtures.Clients.create_client(account: account, actor: actor) + + policy = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["MX"] + }, + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["BR"] + }, + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["TF"] + }, + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["IT"] + } + ] + ) + + actor_group2 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group2 + ) + + actor_group3 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group3 + ) + + actor_group4 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group4 + ) + + policy2 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group2, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["CA"] + } + ] + ) + + policy3 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group3, + conditions: [ + %{ + property: :remote_ip_location_region, + operator: :is_in, + values: ["RU"] + } + ] + ) + + policy4 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group4, + conditions: [ + %{ + property: :current_utc_datetime, + operator: :is_in_day_of_week_time_ranges, + values: ["M/13:00:00-13:00:00/UTC"] + } + ] + ) + + assert {:error, {:forbidden, violated_properties: violated_properties}} = + longest_conforming_policy_for_client( + [policy, policy2, policy3, policy4], + client, + subject.expires_at + ) + + assert Enum.sort(violated_properties) == + Enum.sort([ + :remote_ip_location_region, + :current_utc_datetime + ]) + end + + test "returns expiration of longest conforming policy when less than passed default", %{ + account: account, + actor: actor, + actor_group: actor_group, + resource: resource + } do + # Build relative time ranges based on current time + now = DateTime.utc_now() + + # Get tomorrow as a letter - allows for consistent testing + day_letter = + case Date.day_of_week(now) do + # Monday + 1 -> "M" + # Tuesday + 2 -> "T" + # Wednesday + 3 -> "W" + # Thursday + 4 -> "R" + # Friday + 5 -> "F" + # Saturday + 6 -> "S" + # Sunday + 7 -> "U" + end + + time_range_1 = "#{day_letter}/00:00:00-23:59:59/UTC" + time_range_2 = "#{day_letter}/00:00:00-23:59:58/UTC" + + # Policy2: 1-hour window (shorter policy) + + client = Fixtures.Clients.create_client(account: account, actor: actor) + actor_group2 = Fixtures.Actors.create_group(account: account) + + Fixtures.Actors.create_membership( + account: account, + actor: actor, + group: actor_group2 + ) + + policy1 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group, + conditions: [ + %{ + property: :current_utc_datetime, + operator: :is_in_day_of_week_time_ranges, + values: [time_range_1] + } + ] + ) + + policy2 = + Fixtures.Policies.create_policy( + account: account, + resource: resource, + actor_group: actor_group2, + conditions: [ + %{ + property: :current_utc_datetime, + operator: :is_in_day_of_week_time_ranges, + values: [time_range_2] + } + ] + ) + + in_three_days = DateTime.utc_now() |> DateTime.add(3, :day) + + assert {:ok, expires_at, ^policy1} = + longest_conforming_policy_for_client( + [policy1, policy2], + client, + in_three_days + ) + + assert DateTime.compare(expires_at, in_three_days) == :lt + + in_one_minute = DateTime.utc_now() |> DateTime.add(1, :minute) + + assert {:ok, expires_at, ^policy1} = + longest_conforming_policy_for_client( + [policy1, policy2], + client, + in_one_minute + ) + + assert DateTime.compare(expires_at, in_one_minute) == :eq end end end