mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
fix(portal): don't send reject_access for remaining flows (#10071)
This fixes a simple logic bug where we were mistakenly reacting to a flow deletion event where flows still existed in the cache by sending `reject_access`. This fixes that bug, and adds more comprehensive logging to help diagnose issues like this more quickly in the future. This PR also fixes the following issues found during the investigation: - We were redundantly reacting to Token deletion in the channel pids. This is unnecessary: we send a global socket disconnect from the Token hook module instead. - We had a bug that would crash the WAL consumer if a "global" token (i.e. relay) was deleted or expired - these have no `account_id`. - We now always use `min(max(all_conforming_polices_expiration), token.expires_at)` when setting expiration on a new flow to minimize the possibility for access churn. - We now check to ensure the token and gateway are still undeleted when re-authorizing a given flow. This prevents us from failing to send `reject_access` when a token or gateway is deleted corresponding to a flow, but the other entities would have granted access. Related: https://firezone.statuspage.io/incidents/xrsm13tml3dh Related: #10068 Related: #9501
This commit is contained in:
@@ -12,8 +12,7 @@ defmodule API.Client.Channel do
|
||||
Gateways,
|
||||
Relays,
|
||||
Policies,
|
||||
Flows,
|
||||
Tokens
|
||||
Flows
|
||||
}
|
||||
|
||||
alias Domain.Relays.Presence.Debouncer
|
||||
@@ -40,8 +39,7 @@ defmodule API.Client.Channel do
|
||||
|
||||
@impl true
|
||||
def join("client", _payload, socket) do
|
||||
with {:ok, socket} <- schedule_expiration(socket),
|
||||
{:ok, gateway_version_requirement} <-
|
||||
with {:ok, gateway_version_requirement} <-
|
||||
select_gateway_version_requirement(socket.assigns.client) do
|
||||
socket = assign(socket, gateway_version_requirement: gateway_version_requirement)
|
||||
|
||||
@@ -51,19 +49,6 @@ defmodule API.Client.Channel do
|
||||
end
|
||||
end
|
||||
|
||||
defp schedule_expiration(%{assigns: %{subject: %{expires_at: expires_at}}} = socket) do
|
||||
expires_in =
|
||||
expires_at
|
||||
|> DateTime.diff(DateTime.utc_now(), :millisecond)
|
||||
|
||||
if expires_in > 0 do
|
||||
Process.send_after(self(), :token_expired, expires_in)
|
||||
{:ok, socket}
|
||||
else
|
||||
{:error, %{reason: :token_expired}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
||||
# Called immediately after the client joins the channel
|
||||
@@ -542,26 +527,6 @@ defmodule API.Client.Channel do
|
||||
end
|
||||
end
|
||||
|
||||
# TOKENS
|
||||
|
||||
def handle_info(
|
||||
{:deleted, %Tokens.Token{type: :client, id: id}},
|
||||
%{assigns: %{subject: %{token_id: token_id}}} = socket
|
||||
)
|
||||
when id == token_id do
|
||||
disconnect(socket)
|
||||
end
|
||||
|
||||
####################################
|
||||
##### Reacting to timed events #####
|
||||
####################################
|
||||
|
||||
# Message is scheduled by schedule_expiration/1 on topic join to be sent
|
||||
# when the client token/subject expires
|
||||
def handle_info(:token_expired, socket) do
|
||||
disconnect(socket)
|
||||
end
|
||||
|
||||
####################################
|
||||
#### Reacting to relay presence ####
|
||||
####################################
|
||||
@@ -758,7 +723,7 @@ defmodule API.Client.Channel do
|
||||
}
|
||||
|
||||
with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id),
|
||||
{:ok, policy, expires_at} <- authorize_resource(socket, resource_id),
|
||||
{:ok, expires_at, policy} <- authorize_resource(socket, resource_id),
|
||||
{:ok, gateways} when gateways != [] <-
|
||||
Gateways.all_connected_gateways_for_resource(resource, socket.assigns.subject,
|
||||
preload: :group
|
||||
@@ -863,7 +828,7 @@ defmodule API.Client.Channel do
|
||||
# TODO: Optimization
|
||||
# Gateway selection and flow authorization shouldn't need to hit the DB
|
||||
with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id),
|
||||
{:ok, _policy, _expires_at} <- authorize_resource(socket, resource_id),
|
||||
{:ok, _expires_at, _policy} <- authorize_resource(socket, resource_id),
|
||||
{:ok, [_ | _] = gateways} <-
|
||||
Gateways.all_connected_gateways_for_resource(resource, socket.assigns.subject,
|
||||
preload: :group
|
||||
@@ -920,7 +885,7 @@ defmodule API.Client.Channel do
|
||||
socket
|
||||
) do
|
||||
with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id),
|
||||
{:ok, policy, expires_at} <- authorize_resource(socket, resource_id),
|
||||
{:ok, expires_at, policy} <- authorize_resource(socket, resource_id),
|
||||
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
|
||||
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
|
||||
# TODO: Optimization
|
||||
@@ -980,7 +945,7 @@ defmodule API.Client.Channel do
|
||||
) do
|
||||
# Flow authorization can happen out-of-band since we just authorized the resource above
|
||||
with {:ok, resource} <- Map.fetch(socket.assigns.resources, resource_id),
|
||||
{:ok, policy, expires_at} <- authorize_resource(socket, resource_id),
|
||||
{:ok, expires_at, policy} <- authorize_resource(socket, resource_id),
|
||||
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
|
||||
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
|
||||
# TODO: Optimization
|
||||
@@ -1285,7 +1250,7 @@ defmodule API.Client.Channel do
|
||||
end
|
||||
end
|
||||
|
||||
# Returns either the authorized policy or an error tuple of violated properties
|
||||
# Returns either the longest authorized policy or an error tuple of violated properties
|
||||
defp authorize_resource(socket, resource_id) do
|
||||
OpenTelemetry.Tracer.with_span "client.authorize_resource",
|
||||
attributes: %{
|
||||
@@ -1294,27 +1259,10 @@ defmodule API.Client.Channel do
|
||||
socket.assigns.policies
|
||||
|> Enum.filter(fn {_id, policy} -> policy.resource_id == resource_id end)
|
||||
|> Enum.map(fn {_id, policy} -> policy end)
|
||||
|> Enum.reduce_while({:error, []}, fn policy, {:error, acc} ->
|
||||
case Policies.ensure_client_conforms_policy_conditions(socket.assigns.client, policy) do
|
||||
{:ok, expires_at} ->
|
||||
{:halt, {:ok, policy, expires_at}}
|
||||
|
||||
{:error, {:forbidden, violated_properties: violated_properties}} ->
|
||||
{:cont, {:error, violated_properties ++ acc}}
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
{:error, violated_properties} ->
|
||||
{:error, {:forbidden, violated_properties: violated_properties}}
|
||||
|
||||
{:ok, policy, expires_at} ->
|
||||
# Set a maximum expiration time for the authorization
|
||||
expires_at =
|
||||
expires_at || socket.assigns.subject.expires_at ||
|
||||
DateTime.utc_now() |> DateTime.add(14, :day)
|
||||
|
||||
{:ok, policy, expires_at}
|
||||
end
|
||||
|> Policies.longest_conforming_policy_for_client(
|
||||
socket.assigns.client,
|
||||
socket.assigns.subject.expires_at
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
defmodule API.Gateway.Channel do
|
||||
use API, :channel
|
||||
alias API.Gateway.Views
|
||||
alias Domain.{Accounts, Flows, Gateways, PubSub, Relays, Resources, Tokens}
|
||||
alias Domain.{Accounts, Flows, Gateways, PubSub, Relays, Resources}
|
||||
alias Domain.Relays.Presence.Debouncer
|
||||
require Logger
|
||||
require OpenTelemetry.Tracer
|
||||
@@ -110,38 +110,67 @@ defmodule API.Gateway.Channel do
|
||||
when gateway_id == id do
|
||||
tuple = {flow.client_id, flow.resource_id}
|
||||
|
||||
socket =
|
||||
if flows_map = Map.get(socket.assigns.flows, tuple) do
|
||||
flow_map = Map.delete(flows_map, flow.id)
|
||||
if flows_map = Map.get(socket.assigns.flows, tuple) do
|
||||
flow_map = Map.delete(flows_map, flow.id)
|
||||
remaining = map_size(flow_map)
|
||||
|
||||
with 0 <- map_size(flow_map),
|
||||
{:ok, new_flow} <- Flows.reauthorize_flow(flow) do
|
||||
flow_map = %{
|
||||
new_flow.id => new_flow.expires_at
|
||||
}
|
||||
if remaining == 0 do
|
||||
case Flows.reauthorize_flow(flow) do
|
||||
{:ok, new_flow} ->
|
||||
flow_map = %{
|
||||
new_flow.id => new_flow.expires_at
|
||||
}
|
||||
|
||||
push(
|
||||
socket,
|
||||
"access_authorization_expiry_updated",
|
||||
Views.Flow.render(new_flow, new_flow.expires_at)
|
||||
)
|
||||
Logger.info("Updated flow authorization",
|
||||
old_flow_id: flow.id,
|
||||
account_id: flow.account_id,
|
||||
client_id: flow.client_id,
|
||||
resource_id: flow.resource_id,
|
||||
new_flow_id: new_flow.id
|
||||
)
|
||||
|
||||
push(
|
||||
socket,
|
||||
"access_authorization_expiry_updated",
|
||||
Views.Flow.render(new_flow, new_flow.expires_at)
|
||||
)
|
||||
|
||||
{:noreply, assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))}
|
||||
|
||||
{:error, :forbidden} ->
|
||||
Logger.info("Last flow deleted; revoking access",
|
||||
flow_id: flow.id,
|
||||
account_id: flow.account_id,
|
||||
client_id: flow.client_id,
|
||||
resource_id: flow.resource_id
|
||||
)
|
||||
|
||||
assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))
|
||||
else
|
||||
_ ->
|
||||
# Send reject_access if access is no longer granted
|
||||
|
||||
# TODO: Verify that if the client's websocket connection flaps at the moment this
|
||||
# message is received by the gateway, and the client still has access to this resource,
|
||||
# the client will clear its state so it can request a new flow.
|
||||
push(socket, "reject_access", %{
|
||||
client_id: flow.client_id,
|
||||
resource_id: flow.resource_id
|
||||
})
|
||||
|
||||
assign(socket, flows: Map.delete(socket.assigns.flows, tuple))
|
||||
{:noreply, assign(socket, flows: Map.delete(socket.assigns.flows, tuple))}
|
||||
end
|
||||
else
|
||||
socket
|
||||
end
|
||||
Logger.info("Flow deleted but still has remaining flows for client/resource",
|
||||
flow_id: flow.id,
|
||||
account_id: flow.account_id,
|
||||
client_id: flow.client_id,
|
||||
resource_id: flow.resource_id,
|
||||
remaining: remaining
|
||||
)
|
||||
|
||||
{:noreply, socket}
|
||||
{:noreply, assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))}
|
||||
end
|
||||
else
|
||||
{:noreply, socket}
|
||||
end
|
||||
end
|
||||
|
||||
# RESOURCES
|
||||
@@ -165,14 +194,6 @@ defmodule API.Gateway.Channel do
|
||||
{:noreply, socket}
|
||||
end
|
||||
|
||||
# TOKENS
|
||||
|
||||
# Our gateway token was deleted - disconnect WebSocket
|
||||
def handle_info({:deleted, %Tokens.Token{type: :gateway_group, id: id}}, socket)
|
||||
when id == socket.assigns.token_id do
|
||||
disconnect(socket)
|
||||
end
|
||||
|
||||
####################################
|
||||
#### Reacting to relay presence ####
|
||||
####################################
|
||||
@@ -650,10 +671,4 @@ defmodule API.Gateway.Channel do
|
||||
assign(socket, flows: flows)
|
||||
end
|
||||
end
|
||||
|
||||
defp disconnect(socket) do
|
||||
push(socket, "disconnect", %{"reason" => "token_expired"})
|
||||
send(socket.transport_pid, %Phoenix.Socket.Broadcast{event: "disconnect"})
|
||||
{:stop, :shutdown, socket}
|
||||
end
|
||||
end
|
||||
|
||||
@@ -200,14 +200,16 @@ defmodule API.Client.ChannelTest do
|
||||
refute_receive {:socket_close, _pid, _}
|
||||
end
|
||||
|
||||
test "expires the channel when token is expired", %{client: client, subject: subject} do
|
||||
expires_at = DateTime.utc_now() |> DateTime.add(25, :millisecond)
|
||||
subject = %{subject | expires_at: expires_at}
|
||||
|
||||
test "send disconnect broadcast when the token is deleted", %{
|
||||
client: client,
|
||||
subject: subject
|
||||
} do
|
||||
# We need to trap exits to avoid test process termination
|
||||
# because it is linked to the created test channel process
|
||||
Process.flag(:trap_exit, true)
|
||||
|
||||
:ok = Domain.PubSub.subscribe("sessions:#{subject.token_id}")
|
||||
|
||||
{:ok, _reply, _socket} =
|
||||
API.Client.Socket
|
||||
|> socket("client:#{client.id}", %{
|
||||
@@ -216,9 +218,23 @@ defmodule API.Client.ChannelTest do
|
||||
})
|
||||
|> subscribe_and_join(API.Client.Channel, "client")
|
||||
|
||||
assert_push "disconnect", %{reason: :token_expired}, 250
|
||||
assert_receive {:EXIT, _pid, :shutdown}
|
||||
assert_receive {:socket_close, _pid, :shutdown}
|
||||
token = Repo.get_by(Domain.Tokens.Token, id: subject.token_id)
|
||||
|
||||
data = %{
|
||||
"id" => token.id,
|
||||
"account_id" => token.account_id,
|
||||
"type" => token.type,
|
||||
"expires_at" => token.expires_at
|
||||
}
|
||||
|
||||
Domain.Events.Hooks.Tokens.on_delete(data)
|
||||
|
||||
assert_receive %Phoenix.Socket.Broadcast{
|
||||
topic: topic,
|
||||
event: "disconnect"
|
||||
}
|
||||
|
||||
assert topic == "sessions:#{token.id}"
|
||||
end
|
||||
|
||||
test "selects compatible gateway versions", %{client: client, subject: subject} do
|
||||
|
||||
@@ -126,6 +126,7 @@ defmodule API.Gateway.ChannelTest do
|
||||
Process.flag(:trap_exit, true)
|
||||
|
||||
:ok = Domain.PubSub.Account.subscribe(account.id)
|
||||
:ok = Domain.PubSub.subscribe("sessions:#{token.id}")
|
||||
|
||||
data = %{
|
||||
"id" => token.id,
|
||||
@@ -136,11 +137,14 @@ defmodule API.Gateway.ChannelTest do
|
||||
Events.Hooks.Tokens.on_delete(data)
|
||||
|
||||
assert_receive {:deleted, deleted_token}
|
||||
assert_push "disconnect", payload
|
||||
assert_receive {:EXIT, _pid, _}
|
||||
assert_receive {:socket_close, _pid, _}
|
||||
|
||||
assert_receive %Phoenix.Socket.Broadcast{
|
||||
topic: topic,
|
||||
event: "disconnect"
|
||||
}
|
||||
|
||||
assert topic == "sessions:#{token.id}"
|
||||
assert deleted_token.id == token.id
|
||||
assert payload == %{"reason" => "token_expired"}
|
||||
end
|
||||
|
||||
test "pushes allow_access message", %{
|
||||
@@ -294,7 +298,11 @@ defmodule API.Gateway.ChannelTest do
|
||||
"id" => flow1.id,
|
||||
"client_id" => client.id,
|
||||
"resource_id" => resource.id,
|
||||
"account_id" => account.id
|
||||
"account_id" => account.id,
|
||||
"token_id" => flow1.token_id,
|
||||
"policy_id" => flow1.policy_id,
|
||||
"actor_group_membership_id" => flow1.actor_group_membership_id,
|
||||
"expires_at" => flow1.expires_at
|
||||
}
|
||||
|
||||
send(
|
||||
@@ -361,7 +369,11 @@ defmodule API.Gateway.ChannelTest do
|
||||
"client_id" => client.id,
|
||||
"resource_id" => resource.id,
|
||||
"account_id" => account.id,
|
||||
"gateway_id" => gateway.id
|
||||
"gateway_id" => gateway.id,
|
||||
"token_id" => flow.token_id,
|
||||
"actor_group_membership_id" => flow.actor_group_membership_id,
|
||||
"policy_id" => flow.policy_id,
|
||||
"expires_at" => flow.expires_at
|
||||
}
|
||||
|
||||
send(
|
||||
@@ -499,7 +511,11 @@ defmodule API.Gateway.ChannelTest do
|
||||
"client_id" => other_flow1.client_id,
|
||||
"resource_id" => other_flow1.resource_id,
|
||||
"account_id" => other_flow1.account_id,
|
||||
"gateway_id" => other_flow1.gateway_id
|
||||
"gateway_id" => other_flow1.gateway_id,
|
||||
"token_id" => other_flow1.token_id,
|
||||
"policy_id" => other_flow1.policy_id,
|
||||
"actor_group_membership_id" => other_flow1.actor_group_membership_id,
|
||||
"expires_at" => other_flow1.expires_at
|
||||
}
|
||||
|
||||
Events.Hooks.Flows.on_delete(data)
|
||||
@@ -517,7 +533,11 @@ defmodule API.Gateway.ChannelTest do
|
||||
"client_id" => other_flow2.client_id,
|
||||
"resource_id" => other_flow2.resource_id,
|
||||
"account_id" => other_flow2.account_id,
|
||||
"gateway_id" => other_flow2.gateway_id
|
||||
"gateway_id" => other_flow2.gateway_id,
|
||||
"token_id" => other_flow2.token_id,
|
||||
"policy_id" => other_flow2.policy_id,
|
||||
"actor_group_membership_id" => other_flow2.actor_group_membership_id,
|
||||
"expires_at" => other_flow2.expires_at
|
||||
}
|
||||
|
||||
Events.Hooks.Flows.on_delete(data)
|
||||
@@ -944,7 +964,11 @@ defmodule API.Gateway.ChannelTest do
|
||||
"client_id" => client.id,
|
||||
"resource_id" => resource.id,
|
||||
"account_id" => account.id,
|
||||
"gateway_id" => gateway.id
|
||||
"gateway_id" => gateway.id,
|
||||
"token_id" => flow.token_id,
|
||||
"actor_group_membership_id" => flow.actor_group_membership_id,
|
||||
"policy_id" => flow.policy_id,
|
||||
"expires_at" => flow.expires_at
|
||||
}
|
||||
|
||||
Events.Hooks.Flows.on_delete(data)
|
||||
@@ -1074,7 +1098,11 @@ defmodule API.Gateway.ChannelTest do
|
||||
"client_id" => client.id,
|
||||
"resource_id" => resource.id,
|
||||
"account_id" => account.id,
|
||||
"gateway_id" => gateway.id
|
||||
"gateway_id" => gateway.id,
|
||||
"token_id" => flow.token_id,
|
||||
"actor_group_membership_id" => flow.actor_group_membership_id,
|
||||
"policy_id" => flow.policy_id,
|
||||
"expires_at" => flow.expires_at
|
||||
}
|
||||
|
||||
Events.Hooks.Flows.on_delete(data)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
defmodule Domain.Flows do
|
||||
alias Domain.Repo
|
||||
alias Domain.{Auth, Actors, Clients, Gateways, Resources, Policies}
|
||||
alias Domain.{Auth, Actors, Clients, Gateways, Resources, Policies, Tokens}
|
||||
alias Domain.Flows.{Authorizer, Flow}
|
||||
require Ecto.Query
|
||||
require Logger
|
||||
@@ -34,6 +34,11 @@ defmodule Domain.Flows do
|
||||
expires_at
|
||||
) do
|
||||
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.create_flows_permission()) do
|
||||
# Set the expiration time for the authorization. For normal user clients, this is 7 days, defined in Domain.Auth.
|
||||
# For service accounts using a headless client, it's configurable. This will always be set.
|
||||
# We always cap this by the subject's expires_at so that we synchronize removal of the flow on the gateway
|
||||
# with the websocket connection expiration on the client.
|
||||
|
||||
flow =
|
||||
Flow.Changeset.create(%{
|
||||
token_id: token_id,
|
||||
@@ -59,19 +64,28 @@ defmodule Domain.Flows do
|
||||
# This can happen if a Policy was created that grants redundant access to a client that
|
||||
# is already connected to the Resource, then the initial Policy is deleted.
|
||||
#
|
||||
# We need to create a new flow with the new Policy but the same (or shorter) expiration as
|
||||
# the old flow.
|
||||
# We need to create a new flow with the new Policy. Unfortunately getting the expiration
|
||||
# right is a bit tricky, since we need to synchronize this with the client's state.
|
||||
# If we expire the flow too early, the client will lose access to the Resource without
|
||||
# any change in client state. If we expire the flow too late, the client will have access
|
||||
# to the Resource beyond its intended expiration time.
|
||||
#
|
||||
# So, we use the minimum of either the policy condition or the origin flow's expiration time.
|
||||
def reauthorize_flow(%Flow{} = flow) do
|
||||
with {:ok, client} <- Clients.fetch_client_by_id(flow.client_id, preload: :identity),
|
||||
# TODO: Hard delete
|
||||
# We need to ensure token and gateway haven't been deleted after the initial flow was created
|
||||
# This can be removed after hard-delete since we'll get a DB error if these associations no longer exist
|
||||
{:ok, _token} <- Tokens.fetch_token_by_id(flow.token_id),
|
||||
{:ok, _gateway} <- Gateways.fetch_gateway_by_id(flow.gateway_id),
|
||||
policies when policies != [] <-
|
||||
Policies.all_policies_for_resource_id!(
|
||||
Policies.all_policies_for_resource_id_and_actor_id!(
|
||||
flow.account_id,
|
||||
flow.resource_id
|
||||
flow.resource_id,
|
||||
client.actor_id
|
||||
),
|
||||
conforming_policies when conforming_policies != [] <-
|
||||
Policies.filter_by_conforming_policies_for_client(policies, client),
|
||||
policy <- Enum.at(conforming_policies, 0),
|
||||
{:ok, expires_at} <- Policies.ensure_client_conforms_policy_conditions(client, policy),
|
||||
{:ok, expires_at, policy} <-
|
||||
Policies.longest_conforming_policy_for_client(policies, client, flow.expires_at),
|
||||
{:ok, membership} <-
|
||||
Actors.fetch_membership_by_actor_id_and_group_id(
|
||||
client.actor_id,
|
||||
@@ -80,7 +94,7 @@ defmodule Domain.Flows do
|
||||
{:ok, new_flow} <-
|
||||
Flow.Changeset.create(%{
|
||||
token_id: flow.token_id,
|
||||
policy_id: Enum.at(conforming_policies, 0).id,
|
||||
policy_id: policy.id,
|
||||
client_id: flow.client_id,
|
||||
gateway_id: flow.gateway_id,
|
||||
resource_id: flow.resource_id,
|
||||
@@ -89,13 +103,16 @@ defmodule Domain.Flows do
|
||||
client_remote_ip: client.last_seen_remote_ip,
|
||||
client_user_agent: client.last_seen_user_agent,
|
||||
gateway_remote_ip: flow.gateway_remote_ip,
|
||||
expires_at: expires_at || flow.expires_at
|
||||
expires_at: expires_at
|
||||
})
|
||||
|> Repo.insert() do
|
||||
{:ok, new_flow}
|
||||
else
|
||||
reason ->
|
||||
Logger.info("Failed to reauthorize flow: #{inspect(reason)}")
|
||||
Logger.info("Failed to reauthorize flow",
|
||||
reason: inspect(reason)
|
||||
)
|
||||
|
||||
{:error, :forbidden}
|
||||
end
|
||||
end
|
||||
@@ -209,10 +226,15 @@ defmodule Domain.Flows do
|
||||
|> Repo.delete_all()
|
||||
end
|
||||
|
||||
def delete_flows_for(%Domain.Tokens.Token{} = token) do
|
||||
def delete_flows_for(%Domain.Tokens.Token{account_id: nil}) do
|
||||
# Tokens without an account_id are not associated with any flows. I.e. global relay tokens
|
||||
{0, []}
|
||||
end
|
||||
|
||||
def delete_flows_for(%Domain.Tokens.Token{id: id, account_id: account_id}) do
|
||||
Flow.Query.all()
|
||||
|> Flow.Query.by_account_id(token.account_id)
|
||||
|> Flow.Query.by_token_id(token.id)
|
||||
|> Flow.Query.by_account_id(account_id)
|
||||
|> Flow.Query.by_token_id(id)
|
||||
|> Repo.delete_all()
|
||||
end
|
||||
|
||||
|
||||
@@ -23,6 +23,12 @@ defmodule Domain.Gateways do
|
||||
|> Repo.aggregate(:count)
|
||||
end
|
||||
|
||||
def fetch_gateway_by_id(id) do
|
||||
Gateway.Query.not_deleted()
|
||||
|> Gateway.Query.by_id(id)
|
||||
|> Repo.fetch(Gateway.Query, [])
|
||||
end
|
||||
|
||||
def fetch_group_by_id(id, %Auth.Subject{} = subject, opts \\ []) do
|
||||
required_permissions =
|
||||
{:one_of,
|
||||
|
||||
@@ -74,10 +74,11 @@ defmodule Domain.Policies do
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
def all_policies_for_resource_id!(account_id, resource_id) do
|
||||
def all_policies_for_resource_id_and_actor_id!(account_id, resource_id, actor_id) do
|
||||
Policy.Query.not_disabled()
|
||||
|> Policy.Query.by_account_id(account_id)
|
||||
|> Policy.Query.by_resource_id(resource_id)
|
||||
|> Policy.Query.by_actor_id(actor_id)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
@@ -187,7 +188,6 @@ defmodule Domain.Policies do
|
||||
def filter_by_conforming_policies_for_client(policies, %Clients.Client{} = client) do
|
||||
Enum.filter(policies, fn policy ->
|
||||
policy.conditions
|
||||
|> Enum.filter(&Condition.Evaluator.evaluable_on_connect?/1)
|
||||
|> Condition.Evaluator.ensure_conforms(client)
|
||||
|> case do
|
||||
{:ok, _expires_at} -> true
|
||||
@@ -196,7 +196,55 @@ defmodule Domain.Policies do
|
||||
end)
|
||||
end
|
||||
|
||||
def ensure_client_conforms_policy_conditions(%Clients.Client{} = client, %Policy{} = policy) do
|
||||
@infinity ~U[9999-12-31 23:59:59.999999Z]
|
||||
|
||||
def longest_conforming_policy_for_client(policies, client, token_expires_at) do
|
||||
policies
|
||||
|> Enum.reduce(%{failed: [], succeeded: []}, fn policy, acc ->
|
||||
case ensure_client_conforms_policy_conditions(client, policy) do
|
||||
{:ok, expires_at} ->
|
||||
%{acc | succeeded: [{expires_at, policy} | acc.succeeded]}
|
||||
|
||||
{:error, {:forbidden, violated_properties: violated_properties}} ->
|
||||
%{acc | failed: acc.failed ++ violated_properties}
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
%{succeeded: [], failed: failed} ->
|
||||
{:error, {:forbidden, violated_properties: Enum.uniq(failed)}}
|
||||
|
||||
%{succeeded: succeeded} ->
|
||||
{expires_at, policy} =
|
||||
succeeded
|
||||
|> Enum.max_by(fn {expires_at, _policy} -> expires_at || @infinity end)
|
||||
|
||||
{:ok, min_expires_at(expires_at, token_expires_at), policy}
|
||||
end
|
||||
end
|
||||
|
||||
# All client tokens have *some* expiration
|
||||
def min_expires_at(nil, nil),
|
||||
do: raise("Both policy_expires_at and token_expires_at cannot be nil")
|
||||
|
||||
def min_expires_at(nil, token_expires_at), do: token_expires_at
|
||||
|
||||
def min_expires_at(%DateTime{} = policy_expires_at, %DateTime{} = token_expires_at) do
|
||||
if DateTime.compare(policy_expires_at, token_expires_at) == :lt do
|
||||
policy_expires_at
|
||||
else
|
||||
token_expires_at
|
||||
end
|
||||
end
|
||||
|
||||
defp ensure_has_access_to(%Auth.Subject{} = subject, %Policy{} = policy) do
|
||||
if subject.account.id == policy.account_id do
|
||||
:ok
|
||||
else
|
||||
{:error, :unauthorized}
|
||||
end
|
||||
end
|
||||
|
||||
defp ensure_client_conforms_policy_conditions(%Clients.Client{} = client, %Policy{} = policy) do
|
||||
case Condition.Evaluator.ensure_conforms(policy.conditions, client) do
|
||||
{:ok, expires_at} ->
|
||||
{:ok, expires_at}
|
||||
@@ -205,12 +253,4 @@ defmodule Domain.Policies do
|
||||
{:error, {:forbidden, violated_properties: violated_properties}}
|
||||
end
|
||||
end
|
||||
|
||||
def ensure_has_access_to(%Auth.Subject{} = subject, %Policy{} = policy) do
|
||||
if subject.account.id == policy.account_id do
|
||||
:ok
|
||||
else
|
||||
{:error, :unauthorized}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -4,13 +4,6 @@ defmodule Domain.Policies.Condition.Evaluator do
|
||||
|
||||
@days_of_week ~w[M T W R F S U]
|
||||
|
||||
@doc """
|
||||
Returns `true` if the condition can be evaluated during connection (eg. for IP address matching
|
||||
it can't change while socket is open), otherwise `false`.
|
||||
"""
|
||||
def evaluable_on_connect?(%Condition{property: "current_utc_datetime"}), do: false
|
||||
def evaluable_on_connect?(_), do: true
|
||||
|
||||
def ensure_conforms([], %Clients.Client{}) do
|
||||
{:ok, nil}
|
||||
end
|
||||
|
||||
@@ -5,6 +5,8 @@ defmodule Domain.PubSub do
|
||||
"""
|
||||
use Supervisor
|
||||
|
||||
require Logger
|
||||
|
||||
def start_link(opts) do
|
||||
Supervisor.start_link(__MODULE__, opts)
|
||||
end
|
||||
@@ -49,6 +51,12 @@ defmodule Domain.PubSub do
|
||||
|> Domain.PubSub.subscribe()
|
||||
end
|
||||
|
||||
def broadcast(nil, payload) do
|
||||
Logger.warning("Broadcasting to nil account_id is not allowed",
|
||||
payload: inspect(payload)
|
||||
)
|
||||
end
|
||||
|
||||
def broadcast(account_id, payload) do
|
||||
account_id
|
||||
|> topic()
|
||||
|
||||
@@ -19,6 +19,13 @@ defmodule Domain.Tokens do
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
end
|
||||
|
||||
def fetch_token_by_id(id) do
|
||||
Token.Query.not_deleted()
|
||||
|> Token.Query.not_expired()
|
||||
|> Token.Query.by_id(id)
|
||||
|> Repo.fetch(Token.Query, [])
|
||||
end
|
||||
|
||||
def fetch_token_by_id(id, %Auth.Subject{} = subject, opts \\ []) do
|
||||
required_permissions =
|
||||
{:one_of,
|
||||
|
||||
@@ -6,13 +6,23 @@ defmodule Domain.PoliciesTest do
|
||||
setup do
|
||||
account = Fixtures.Accounts.create_account()
|
||||
actor = Fixtures.Actors.create_actor(type: :account_admin_user, account: account)
|
||||
actor_group = Fixtures.Actors.create_group(account: account)
|
||||
identity = Fixtures.Auth.create_identity(account: account, actor: actor)
|
||||
subject = Fixtures.Auth.create_subject(identity: identity)
|
||||
resource = Fixtures.Resources.create_resource(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group
|
||||
)
|
||||
|
||||
%{
|
||||
account: account,
|
||||
actor: actor,
|
||||
actor_group: actor_group,
|
||||
identity: identity,
|
||||
resource: resource,
|
||||
subject: subject
|
||||
}
|
||||
end
|
||||
@@ -704,10 +714,7 @@ defmodule Domain.PoliciesTest do
|
||||
end
|
||||
|
||||
describe "delete_policies_for/1" do
|
||||
setup %{account: account, subject: subject} do
|
||||
resource = Fixtures.Resources.create_resource(account: account)
|
||||
actor_group = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
setup %{resource: resource, actor_group: actor_group, account: account, subject: subject} do
|
||||
policy =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
@@ -843,42 +850,365 @@ defmodule Domain.PoliciesTest do
|
||||
end
|
||||
end
|
||||
|
||||
describe "ensure_client_conforms_policy_conditions/2" do
|
||||
test "returns :ok when client conforms to policy conditions", %{} do
|
||||
client = %Domain.Clients.Client{
|
||||
last_seen_remote_ip_location_region: "US"
|
||||
}
|
||||
|
||||
policy = %Policies.Policy{
|
||||
conditions: [
|
||||
%Policies.Condition{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["US"]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
assert ensure_client_conforms_policy_conditions(client, policy) == {:ok, nil}
|
||||
describe "filter_by_conforming_policies_for_client/2" do
|
||||
test "returns empty list when there are no policies", %{} do
|
||||
client = Fixtures.Clients.create_client()
|
||||
assert filter_by_conforming_policies_for_client([], client) == []
|
||||
end
|
||||
|
||||
test "returns error when client conforms to policy conditions", %{} do
|
||||
client = %Domain.Clients.Client{
|
||||
last_seen_remote_ip_location_region: "US"
|
||||
}
|
||||
test "filters based on policy conditions", %{
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor: actor,
|
||||
actor_group: actor_group
|
||||
} do
|
||||
client = Fixtures.Clients.create_client(account: account, actor: actor)
|
||||
actor_group2 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
policy = %Policies.Policy{
|
||||
conditions: [
|
||||
%Policies.Condition{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["CA"]
|
||||
}
|
||||
]
|
||||
}
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group2
|
||||
)
|
||||
|
||||
assert ensure_client_conforms_policy_conditions(client, policy) ==
|
||||
{:error, {:forbidden, [violated_properties: [:remote_ip_location_region]]}}
|
||||
actor_group3 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group3
|
||||
)
|
||||
|
||||
actor_group4 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group4
|
||||
)
|
||||
|
||||
actor_group5 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group5
|
||||
)
|
||||
|
||||
actor_group6 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group6
|
||||
)
|
||||
|
||||
policy1 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group,
|
||||
conditions: [
|
||||
%{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["CA"]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
policy2 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group2,
|
||||
conditions: [
|
||||
%{
|
||||
property: :remote_ip,
|
||||
operator: :is_in_cidr,
|
||||
values: ["1.1.1.1/32"]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
policy3 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group3,
|
||||
conditions: [
|
||||
%{
|
||||
property: :provider_id,
|
||||
operator: :is_in,
|
||||
values: [Ecto.UUID.generate()]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
policy4 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group4,
|
||||
conditions: [
|
||||
%{
|
||||
property: :current_utc_datetime,
|
||||
operator: :is_in_day_of_week_time_ranges,
|
||||
values: ["M/13:00:00-13:00:00/UTC"]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
policy5 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group5,
|
||||
conditions: [
|
||||
%{
|
||||
property: :client_verified,
|
||||
operator: :is,
|
||||
values: ["true"]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
policy6 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group6,
|
||||
conditions: []
|
||||
)
|
||||
|
||||
assert filter_by_conforming_policies_for_client(
|
||||
[policy1, policy2, policy3, policy4, policy5, policy6],
|
||||
client
|
||||
) == [policy6]
|
||||
end
|
||||
end
|
||||
|
||||
describe "longest_conforming_policy_for_client/3" do
|
||||
test "returns forbidden when there are no matching policies", %{
|
||||
account: account,
|
||||
actor: actor,
|
||||
subject: subject
|
||||
} do
|
||||
client = Fixtures.Clients.create_client(account: account, actor: actor)
|
||||
|
||||
assert longest_conforming_policy_for_client([], client, subject.expires_at) ==
|
||||
{:error, {:forbidden, violated_properties: []}}
|
||||
end
|
||||
|
||||
test "accumulates and uniqs violated properties", %{
|
||||
account: account,
|
||||
actor: actor,
|
||||
actor_group: actor_group,
|
||||
resource: resource,
|
||||
subject: subject
|
||||
} do
|
||||
client = Fixtures.Clients.create_client(account: account, actor: actor)
|
||||
|
||||
policy =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group,
|
||||
conditions: [
|
||||
%{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["MX"]
|
||||
},
|
||||
%{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["BR"]
|
||||
},
|
||||
%{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["TF"]
|
||||
},
|
||||
%{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["IT"]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
actor_group2 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group2
|
||||
)
|
||||
|
||||
actor_group3 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group3
|
||||
)
|
||||
|
||||
actor_group4 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group4
|
||||
)
|
||||
|
||||
policy2 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group2,
|
||||
conditions: [
|
||||
%{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["CA"]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
policy3 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group3,
|
||||
conditions: [
|
||||
%{
|
||||
property: :remote_ip_location_region,
|
||||
operator: :is_in,
|
||||
values: ["RU"]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
policy4 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group4,
|
||||
conditions: [
|
||||
%{
|
||||
property: :current_utc_datetime,
|
||||
operator: :is_in_day_of_week_time_ranges,
|
||||
values: ["M/13:00:00-13:00:00/UTC"]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
assert {:error, {:forbidden, violated_properties: violated_properties}} =
|
||||
longest_conforming_policy_for_client(
|
||||
[policy, policy2, policy3, policy4],
|
||||
client,
|
||||
subject.expires_at
|
||||
)
|
||||
|
||||
assert Enum.sort(violated_properties) ==
|
||||
Enum.sort([
|
||||
:remote_ip_location_region,
|
||||
:current_utc_datetime
|
||||
])
|
||||
end
|
||||
|
||||
test "returns expiration of longest conforming policy when less than passed default", %{
|
||||
account: account,
|
||||
actor: actor,
|
||||
actor_group: actor_group,
|
||||
resource: resource
|
||||
} do
|
||||
# Build relative time ranges based on current time
|
||||
now = DateTime.utc_now()
|
||||
|
||||
# Get tomorrow as a letter - allows for consistent testing
|
||||
day_letter =
|
||||
case Date.day_of_week(now) do
|
||||
# Monday
|
||||
1 -> "M"
|
||||
# Tuesday
|
||||
2 -> "T"
|
||||
# Wednesday
|
||||
3 -> "W"
|
||||
# Thursday
|
||||
4 -> "R"
|
||||
# Friday
|
||||
5 -> "F"
|
||||
# Saturday
|
||||
6 -> "S"
|
||||
# Sunday
|
||||
7 -> "U"
|
||||
end
|
||||
|
||||
time_range_1 = "#{day_letter}/00:00:00-23:59:59/UTC"
|
||||
time_range_2 = "#{day_letter}/00:00:00-23:59:58/UTC"
|
||||
|
||||
# Policy2: 1-hour window (shorter policy)
|
||||
|
||||
client = Fixtures.Clients.create_client(account: account, actor: actor)
|
||||
actor_group2 = Fixtures.Actors.create_group(account: account)
|
||||
|
||||
Fixtures.Actors.create_membership(
|
||||
account: account,
|
||||
actor: actor,
|
||||
group: actor_group2
|
||||
)
|
||||
|
||||
policy1 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group,
|
||||
conditions: [
|
||||
%{
|
||||
property: :current_utc_datetime,
|
||||
operator: :is_in_day_of_week_time_ranges,
|
||||
values: [time_range_1]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
policy2 =
|
||||
Fixtures.Policies.create_policy(
|
||||
account: account,
|
||||
resource: resource,
|
||||
actor_group: actor_group2,
|
||||
conditions: [
|
||||
%{
|
||||
property: :current_utc_datetime,
|
||||
operator: :is_in_day_of_week_time_ranges,
|
||||
values: [time_range_2]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
in_three_days = DateTime.utc_now() |> DateTime.add(3, :day)
|
||||
|
||||
assert {:ok, expires_at, ^policy1} =
|
||||
longest_conforming_policy_for_client(
|
||||
[policy1, policy2],
|
||||
client,
|
||||
in_three_days
|
||||
)
|
||||
|
||||
assert DateTime.compare(expires_at, in_three_days) == :lt
|
||||
|
||||
in_one_minute = DateTime.utc_now() |> DateTime.add(1, :minute)
|
||||
|
||||
assert {:ok, expires_at, ^policy1} =
|
||||
longest_conforming_policy_for_client(
|
||||
[policy1, policy2],
|
||||
client,
|
||||
in_one_minute
|
||||
)
|
||||
|
||||
assert DateTime.compare(expires_at, in_one_minute) == :eq
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user