fix(portal): reauthorize new flow when last flow deleted (#9974)

The `flows` table tracks authorizations we've made for a resource and
persists them, so that we can determine which authorizations are still
valid across deploys or hiccups in the control plane connections.

Before, when the "in-use" authorization for a resource was deleted, we
would have flapped the resource in the client, and sent `reject_access`
to the gateway. However, that would cause issues in the following edge
case:

- Client is currently connected to Resource A through Policy B
- Client websocket goes down
- Policy B is created for Resource A (for another actor group), and
Policy A is deleted by admin
- Client reconnects
- Client sees that its resource list is the same
- Gateway has since received `reject_access` because no new flows were
created for this client-resource combination

To prevent this from happening, we now try to "reauthorize" the flow
whenever the last cached flow is removed for a particular
client-resource pair. This avoids needing to toggle the resource on the
client since we won't have sent `reject_access` to the gateway.
This commit is contained in:
Jamil
2025-07-24 21:53:10 -04:00
committed by GitHub
parent 083d0ac0f4
commit ccc736e63e
22 changed files with 389 additions and 204 deletions

View File

@@ -63,14 +63,11 @@ defmodule API.Client.Channel do
# Called immediately after the client joins the channel
def handle_info(:after_join, socket) do
# Initialize the cache. Flows contains active flows for this client, so we
# can toggle the affected resource if the active flow is deleted. That will allow
# the client to create a new flow if the resource is still authorized.
# Initialize the cache.
socket =
socket
|> hydrate_policies_and_resources()
|> hydrate_membership_group_ids()
|> assign(flows: MapSet.new())
|> hydrate_memberships()
# Initialize relays
{:ok, relays} = select_relays(socket)
@@ -138,7 +135,7 @@ defmodule API.Client.Channel do
# ACTOR_GROUP_MEMBERSHIPS
def handle_info(
{:created, %Actors.Membership{actor_id: actor_id, group_id: group_id}},
{:created, %Actors.Membership{actor_id: actor_id, group_id: group_id} = membership},
%{assigns: %{client: %{actor_id: id}}} = socket
)
when id == actor_id do
@@ -152,8 +149,8 @@ defmodule API.Client.Channel do
socket = hydrate_policies_and_resources(socket)
# 3. Update our membership group IDs
ids = MapSet.put(socket.assigns.membership_group_ids, group_id)
socket = assign(socket, membership_group_ids: ids)
memberships = Map.put(socket.assigns.memberships, group_id, membership)
socket = assign(socket, memberships: memberships)
# 3. Get new resources
new_resources = authorized_resources(socket) -- existing_resources
@@ -192,13 +189,13 @@ defmodule API.Client.Channel do
r_ids = Enum.map(policies, fn {_id, policy} -> policy.resource_id end) |> Enum.uniq()
resources = Map.take(socket.assigns.resources, r_ids)
membership_group_ids = Map.delete(socket.assigns.membership_group_ids, group_id)
memberships = Map.delete(socket.assigns.memberships, group_id)
socket =
socket
|> assign(policies: policies)
|> assign(resources: resources)
|> assign(membership_group_ids: membership_group_ids)
|> assign(memberships: memberships)
{:noreply, socket}
end
@@ -244,38 +241,6 @@ defmodule API.Client.Channel do
disconnect(socket)
end
# FLOWS
def handle_info(
{:deleted, %Flows.Flow{client_id: client_id} = flow},
%{assigns: %{client: %{id: id}}} = socket
)
when client_id == id do
if MapSet.member?(socket.assigns.flows, flow.id) do
# If an active flow is deleted, we need to recreate it.
# To do that, we need to flap the resource on the client because it doesn't track flows.
# The gateway is also tracking flows and will have sent a reject_access for this client/resource
# if this was the last flow in its cache that was authorizing it.
push(socket, "resource_deleted", flow.resource_id)
resource = Map.get(socket.assigns.resources, flow.resource_id)
# Access to resource is still allowed, allow creating a new flow
if not is_nil(resource) and resource.id in Enum.map(authorized_resources(socket), & &1.id) do
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
else
Logger.warning("Active flow deleted for resource but resource not found in socket state",
resource_id: flow.resource_id,
flow_id: flow.id
)
end
{:noreply, assign(socket, flows: MapSet.delete(socket.assigns.flows, flow.id))}
else
{:noreply, socket}
end
end
# GATEWAY_GROUPS
def handle_info(
@@ -307,7 +272,7 @@ defmodule API.Client.Channel do
def handle_info({:created, %Policies.Policy{} = policy}, socket) do
# 1. Check if this policy is for us
if MapSet.member?(socket.assigns.membership_group_ids, policy.actor_group_id) do
if Map.has_key?(socket.assigns.memberships, policy.actor_group_id) do
# 2. Snapshot existing resources
existing_resources = authorized_resources(socket)
@@ -731,7 +696,9 @@ defmodule API.Client.Channel do
gateway,
resource_id,
policy,
socket.assigns.subject
Map.fetch!(socket.assigns.memberships, policy.actor_group_id).id,
socket.assigns.subject,
expires_at
)
preshared_key = generate_preshared_key()
@@ -751,8 +718,6 @@ defmodule API.Client.Channel do
}}
)
socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id))
{:noreply, socket}
else
{:error, :not_found} ->
@@ -886,7 +851,9 @@ defmodule API.Client.Channel do
gateway,
resource_id,
policy,
socket.assigns.subject
Map.fetch!(socket.assigns.memberships, policy.actor_group_id).id,
socket.assigns.subject,
expires_at
)
:ok =
@@ -902,8 +869,6 @@ defmodule API.Client.Channel do
}}
)
socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id))
{:noreply, socket}
else
{:error, :not_found} ->
@@ -946,7 +911,9 @@ defmodule API.Client.Channel do
gateway,
resource_id,
policy,
socket.assigns.subject
Map.fetch!(socket.assigns.memberships, policy.actor_group_id).id,
socket.assigns.subject,
expires_at
)
:ok =
@@ -963,8 +930,6 @@ defmodule API.Client.Channel do
}}
)
socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id))
{:noreply, socket}
else
{:error, :not_found} ->
@@ -1206,16 +1171,19 @@ defmodule API.Client.Channel do
end
end
defp hydrate_membership_group_ids(socket) do
OpenTelemetry.Tracer.with_span "client.hydrate_membership_group_ids",
defp hydrate_memberships(socket) do
OpenTelemetry.Tracer.with_span "client.hydrate_memberships",
attributes: %{
account_id: socket.assigns.client.account_id
} do
membership_group_ids =
Actors.all_actor_group_ids!(socket.assigns.subject.actor)
|> MapSet.new()
memberships =
Actors.all_memberships_for_actor!(socket.assigns.subject.actor)
|> Enum.map(fn membership ->
{membership.group_id, membership}
end)
|> Enum.into(%{})
assign(socket, membership_group_ids: membership_group_ids)
assign(socket, memberships: memberships)
end
end
@@ -1240,7 +1208,7 @@ defmodule API.Client.Channel do
end
end
# Returns either the authorized resource or an error tuple of violated properties
# Returns either the authorized policy or an error tuple of violated properties
defp authorize_resource(socket, resource_id) do
OpenTelemetry.Tracer.with_span "client.authorize_resource",
attributes: %{
@@ -1264,7 +1232,11 @@ defmodule API.Client.Channel do
{:ok, policy, expires_at} ->
# Set a maximum expiration time for the authorization
{:ok, policy, expires_at || socket.assigns.subject.expires_at}
expires_at =
expires_at || socket.assigns.subject.expires_at ||
DateTime.utc_now() |> DateTime.add(14, :day)
{:ok, policy, expires_at}
end
end
end

View File

@@ -111,19 +111,31 @@ defmodule API.Gateway.Channel do
tuple = {flow.client_id, flow.resource_id}
socket =
if flow_map = Map.get(socket.assigns.flows, tuple) do
flow_map = Map.delete(flow_map, flow.id)
if flows_map = Map.get(socket.assigns.flows, tuple) do
flow_map = Map.delete(flows_map, flow.id)
if map_size(flow_map) == 0 do
# Send reject_access if this was the last flow granting access for this client/resource
push(socket, "reject_access", %{
client_id: flow.client_id,
resource_id: flow.resource_id
})
with 0 <- map_size(flow_map),
{:ok, new_flow} <- Flows.reauthorize_flow(flow) do
flow_map = %{
new_flow.id => new_flow.expires_at
}
push(
socket,
"access_authorization_expiry_updated",
Views.Flow.render(new_flow, new_flow.expires_at)
)
assign(socket, flows: Map.delete(socket.assigns.flows, tuple))
else
assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))
else
_ ->
# Send reject_access if access is no longer granted
push(socket, "reject_access", %{
client_id: flow.client_id,
resource_id: flow.resource_id
})
assign(socket, flows: Map.delete(socket.assigns.flows, tuple))
end
else
socket
@@ -144,7 +156,7 @@ defmodule API.Gateway.Channel do
when old_filters != filters do
has_flows? =
socket.assigns.flows
|> Enum.any?(fn {{_client_id, resource_id}, _expires_at} -> resource_id == id end)
|> Enum.any?(fn {{_client_id, resource_id}, _flow_map} -> resource_id == id end)
if has_flows? do
push(socket, "resource_updated", Views.Resource.render(resource))
@@ -214,49 +226,48 @@ defmodule API.Gateway.Channel do
payload: %{joins: joins}
},
socket
) do
if Enum.count(joins) > 0 do
{:ok, relays} = select_relays(socket)
)
when map_size(joins) > 0 do
{:ok, relays} = select_relays(socket)
if length(relays) > 0 do
relay_credentials_expire_at = DateTime.utc_now() |> DateTime.add(90, :day)
if length(relays) > 0 do
relay_credentials_expire_at = DateTime.utc_now() |> DateTime.add(90, :day)
:ok =
Relays.unsubscribe_from_relays_presence_in_account(socket.assigns.gateway.account_id)
:ok =
Relays.unsubscribe_from_relays_presence_in_account(socket.assigns.gateway.account_id)
:ok =
Enum.each(relays, fn relay ->
# TODO: WAL
# Why are we unsubscribing and subscribing again?
:ok = Relays.unsubscribe_from_relay_presence(relay)
:ok = Relays.subscribe_to_relay_presence(relay)
end)
:ok =
Enum.each(relays, fn relay ->
# TODO: WAL
# Why are we unsubscribing and subscribing again?
:ok = Relays.unsubscribe_from_relay_presence(relay)
:ok = Relays.subscribe_to_relay_presence(relay)
end)
# Cache new stamp secrets
socket = Debouncer.cache_stamp_secrets(socket, relays)
# Cache new stamp secrets
socket = Debouncer.cache_stamp_secrets(socket, relays)
# If a relay reconnects with a different stamp_secret, disconnect them immediately
joined_ids = Map.keys(joins)
# If a relay reconnects with a different stamp_secret, disconnect them immediately
joined_ids = Map.keys(joins)
{socket, disconnected_ids} =
Debouncer.cancel_leaves_or_disconnect_immediately(
socket,
joined_ids,
socket.assigns.gateway.account_id
{socket, disconnected_ids} =
Debouncer.cancel_leaves_or_disconnect_immediately(
socket,
joined_ids,
socket.assigns.gateway.account_id
)
push(socket, "relays_presence", %{
disconnected_ids: disconnected_ids,
connected:
Views.Relay.render_many(
relays,
socket.assigns.gateway.public_key,
relay_credentials_expire_at
)
})
push(socket, "relays_presence", %{
disconnected_ids: disconnected_ids,
connected:
Views.Relay.render_many(
relays,
socket.assigns.gateway.public_key,
relay_credentials_expire_at
)
})
{:noreply, socket}
end
{:noreply, socket}
else
{:noreply, socket}
end
@@ -321,11 +332,7 @@ defmodule API.Gateway.Channel do
gateway_ice_credentials: ice_credentials.gateway,
client: Views.Client.render(client, preshared_key),
client_ice_credentials: ice_credentials.client,
# Gateway manages its own expiration
expires_at:
if(authorization_expires_at,
do: DateTime.to_unix(authorization_expires_at, :second)
)
expires_at: DateTime.to_unix(authorization_expires_at, :second)
})
# Start tracking flow
@@ -366,13 +373,11 @@ defmodule API.Gateway.Channel do
{channel_pid, socket_ref, resource.id}
)
expires_at = DateTime.to_unix(authorization_expires_at, :second)
push(socket, "allow_access", %{
ref: ref,
client_id: client.id,
resource: Views.Resource.render(resource),
expires_at: expires_at,
expires_at: DateTime.to_unix(authorization_expires_at, :second),
payload: payload,
client_ipv4: client.ipv4,
client_ipv6: client.ipv6
@@ -421,13 +426,11 @@ defmodule API.Gateway.Channel do
{channel_pid, socket_ref, resource.id}
)
expires_at = DateTime.to_unix(authorization_expires_at, :second)
push(socket, "request_connection", %{
ref: ref,
resource: Views.Resource.render(resource),
client: Views.Client.render(client, payload, preshared_key),
expires_at: expires_at
expires_at: DateTime.to_unix(authorization_expires_at, :second)
})
# Start tracking the flow
@@ -638,10 +641,7 @@ defmodule API.Gateway.Channel do
# This data structure is used to efficiently:
# 1. Check if there are any active flows remaining for this client/resource?
# 2. Remove a deleted flow
|> Enum.reduce(%{}, fn {{client_id, resource_id}, {flow_id, inserted_at}}, acc ->
# Assume all flows have a 14 day expiration if they still exist
expires_at = DateTime.add(inserted_at, 14, :day)
|> Enum.reduce(%{}, fn {{client_id, resource_id}, {flow_id, expires_at}}, acc ->
flow_id_map = Map.get(acc, {client_id, resource_id}, %{})
Map.put(acc, {client_id, resource_id}, Map.put(flow_id_map, flow_id, expires_at))

View File

@@ -1,10 +1,21 @@
defmodule API.Gateway.Views.Flow do
def render(flow, expires_at) do
%{
client_id: flow.client_id,
resource_id: flow.resource_id,
expires_at: DateTime.to_unix(expires_at, :second)
}
end
def render_many(flows) do
flows
|> Enum.map(fn {{client_id, resource_id}, _flow_map} ->
|> Enum.map(fn {{client_id, resource_id}, flow_map} ->
expires_at = Enum.min(Map.values(flow_map))
%{
client_id: client_id,
resource_id: resource_id
resource_id: resource_id,
expires_at: DateTime.to_unix(expires_at, :second)
}
end)
end

View File

@@ -1073,7 +1073,6 @@ defmodule API.Client.ChannelTest do
"connected_gateway_ids" => []
})
# assert_reply ref, :error, %{reason: :not_found}
assert_push "flow_creation_failed", %{reason: :not_found, resource_id: ^resource_id}
end
@@ -1098,7 +1097,6 @@ defmodule API.Client.ChannelTest do
"connected_gateway_ids" => []
})
# assert_reply ref, :error, %{reason: :offline}
assert_push "flow_creation_failed", %{reason: :offline, resource_id: resource_id}
assert resource_id == resource.id
end
@@ -1119,8 +1117,6 @@ defmodule API.Client.ChannelTest do
push(socket, "create_flow", attrs)
# assert_reply ref, :error, %{reason: :not_found}
assert_push "flow_creation_failed", %{reason: :not_found, resource_id: resource_id}
assert resource_id == resource.id
end

View File

@@ -1027,45 +1027,6 @@ defmodule API.Gateway.ChannelTest do
DateTime.truncate(expires_at, :second)
end
test "pushes authorize_flow message for authorizations that do not expire", %{
client: client,
gateway: gateway,
resource: resource,
account: account,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
preshared_key = "PSK"
ice_credentials = %{
client: %{username: "A", password: "B"},
gateway: %{username: "C", password: "D"}
}
send(
socket.channel_pid,
{{:authorize_flow, gateway.id}, {channel_pid, socket_ref},
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: nil,
ice_credentials: ice_credentials,
preshared_key: preshared_key
}}
)
assert_push "authorize_flow", %{expires_at: nil}
end
test "authorize_flow tracks flow and sends reject_access when flow is deleted", %{
account: account,
client: client,