fix(portal): don't prematurely reject access (#9952)

Before:

- When a flow was deleted, we flapped the resource on the client, and
sent `reject_access` naively for the flow's `{client_id, resource_id}`
pair on the gateway. This resulted in lots of unneeded resource flappage
on the client whenever bulk flow deletions happened.

After:

- When a flow is deleted, we check if this is an active flow for the
client. If so, we flap the resource then in order to trigger generation
of a new flow. If access was truly affected, that results in a loss of a
resource, we will push `resource_deleted` for the update that triggered
the flow deletion (for example the resource/policy removal). On the
gateway, we only send `reject_access` if it was the last flow granting
access for a particular `client/resource` tuple.


Why:

- While the access state is still correct in the previous
implementation, we run the possibility of pushing way too many resource
flaps to the client in an overly eager attempt to remove access the
client may not have access to.

cc @thomaseizinger 

Related:
https://firezonehq.slack.com/archives/C08FPHECLUF/p1753101115735179
This commit is contained in:
Jamil
2025-07-21 16:12:05 -04:00
committed by GitHub
parent dff6495057
commit 488cb96469
6 changed files with 317 additions and 74 deletions

View File

@@ -63,11 +63,14 @@ defmodule API.Client.Channel do
# Called immediately after the client joins the channel
def handle_info(:after_join, socket) do
# Initialize the cache
# Initialize the cache. Flows contains active flows for this client, so we
# can toggle the affected resource if the active flow is deleted. That will allow
# the client to create a new flow if the resource is still authorized.
socket =
socket
|> hydrate_policies_and_resources()
|> hydrate_membership_group_ids()
|> assign(flows: MapSet.new())
# Initialize relays
{:ok, relays} = select_relays(socket)
@@ -243,20 +246,34 @@ defmodule API.Client.Channel do
# FLOWS
# If we're authorized for a resource for this flow, we need to push resource_deleted
# followed by resource_created_or_updated in order to reset the access state on the
# client.
def handle_info({:deleted, %Flows.Flow{} = flow}, socket) do
# 1. Check if this possibly affects us
if resource = socket.assigns.resources[flow.resource_id] do
# 1. If so, check if we're currently authorized for this resource
if resource.id in Enum.map(authorized_resources(socket), & &1.id) do
push(socket, "resource_deleted", resource.id)
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
end
end
def handle_info(
{:deleted, %Flows.Flow{client_id: client_id} = flow},
%{assigns: %{client: %{id: id}}} = socket
)
when client_id == id do
if MapSet.member?(socket.assigns.flows, flow.id) do
# If an active flow is deleted, we need to recreate it.
# To do that, we need to flap the resource on the client because it doesn't track flows.
# The gateway is also tracking flows and will have sent a reject_access for this client/resource
# if this was the last flow in its cache that was authorizing it.
push(socket, "resource_deleted", flow.resource_id)
{:noreply, socket}
resource = Map.get(socket.assigns.resources, flow.resource_id)
# Access to resource is still allowed, allow creating a new flow
if not is_nil(resource) and resource.id in Enum.map(authorized_resources(socket), & &1.id) do
push(socket, "resource_created_or_updated", Views.Resource.render(resource))
else
Logger.warning("Active flow deleted for resource but resource not found in socket state",
resource_id: flow.resource_id,
flow_id: flow.id
)
end
{:noreply, assign(socket, flows: MapSet.delete(socket.assigns.flows, flow.id))}
else
{:noreply, socket}
end
end
# GATEWAY_GROUPS
@@ -708,7 +725,7 @@ defmodule API.Client.Channel do
# TODO: Optimization
# Move this to a Task.start that completes after broadcasting authorize_flow
{:ok, _flow} =
{:ok, flow} =
Flows.create_flow(
socket.assigns.client,
gateway,
@@ -727,12 +744,15 @@ defmodule API.Client.Channel do
%{
client: socket.assigns.client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
ice_credentials: ice_credentials,
preshared_key: preshared_key
}}
)
socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id))
{:noreply, socket}
else
{:error, :not_found} ->
@@ -860,7 +880,7 @@ defmodule API.Client.Channel do
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
# TODO: Optimization
{:ok, _flow} =
{:ok, flow} =
Flows.create_flow(
socket.assigns.client,
gateway,
@@ -876,11 +896,14 @@ defmodule API.Client.Channel do
%{
client: socket.assigns.client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: payload
}}
)
socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id))
{:noreply, socket}
else
{:error, :not_found} ->
@@ -917,7 +940,7 @@ defmodule API.Client.Channel do
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
# TODO: Optimization
{:ok, _flow} =
{:ok, flow} =
Flows.create_flow(
socket.assigns.client,
gateway,
@@ -933,12 +956,15 @@ defmodule API.Client.Channel do
%{
client: socket.assigns.client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload,
client_preshared_key: preshared_key
}}
)
socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id))
{:noreply, socket}
else
{:error, :not_found} ->

View File

@@ -55,11 +55,23 @@ defmodule API.Gateway.Channel do
now = DateTime.utc_now()
# Reject flows older than 14 days
# 1. Remove individual flows older than 14 days, then remove access entry if no flows left
flows =
socket.assigns.flows
|> Enum.reject(fn {_tuple, expires_at} -> DateTime.compare(expires_at, now) == :lt end)
|> Enum.map(fn {tuple, flow_id_map} ->
flow_id_map =
Enum.reject(flow_id_map, fn {_flow_id, expires_at} ->
DateTime.compare(expires_at, now) == :lt
end)
|> Enum.into(%{})
{tuple, flow_id_map}
end)
|> Enum.into(%{})
|> Enum.reject(fn {_tuple, flow_id_map} -> map_size(flow_id_map) == 0 end)
|> Enum.into(%{})
# The gateway has its own flow expiration, so no need to send `reject_access`
{:noreply, assign(socket, flows: flows)}
end
@@ -89,17 +101,30 @@ defmodule API.Gateway.Channel do
# FLOWS
def handle_info({:deleted, %Flows.Flow{} = flow}, socket) do
def handle_info(
{:deleted, %Flows.Flow{gateway_id: gateway_id} = flow},
%{
assigns: %{gateway: %{id: id}}
} = socket
)
when gateway_id == id do
tuple = {flow.client_id, flow.resource_id}
socket =
if Map.has_key?(socket.assigns.flows, tuple) do
push(socket, "reject_access", %{
client_id: flow.client_id,
resource_id: flow.resource_id
})
if flow_map = Map.get(socket.assigns.flows, tuple) do
flow_map = Map.delete(flow_map, flow.id)
assign(socket, flows: Map.delete(socket.assigns.flows, tuple))
if map_size(flow_map) == 0 do
# Send reject_access if this was the last flow granting access for this client/resource
push(socket, "reject_access", %{
client_id: flow.client_id,
resource_id: flow.resource_id
})
assign(socket, flows: Map.delete(socket.assigns.flows, tuple))
else
assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map))
end
else
socket
end
@@ -275,6 +300,7 @@ defmodule API.Gateway.Channel do
%{
client: client,
resource: resource,
flow_id: flow_id,
authorization_expires_at: authorization_expires_at,
ice_credentials: ice_credentials,
preshared_key: preshared_key
@@ -303,7 +329,13 @@ defmodule API.Gateway.Channel do
})
# Start tracking flow
flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at)
tuple = {client.id, resource.id}
flow_map =
Map.get(socket.assigns.flows, tuple, %{})
|> Map.put(flow_id, authorization_expires_at)
flows = Map.put(socket.assigns.flows, tuple, flow_map)
socket = assign(socket, flows: flows)
{:noreply, socket}
@@ -318,6 +350,7 @@ defmodule API.Gateway.Channel do
%{
client: client,
resource: resource,
flow_id: flow_id,
authorization_expires_at: authorization_expires_at,
client_payload: payload
} = attrs
@@ -346,7 +379,13 @@ defmodule API.Gateway.Channel do
})
# Start tracking the flow
flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at)
tuple = {client.id, resource.id}
flow_map =
Map.get(socket.assigns.flows, tuple, %{})
|> Map.put(flow_id, authorization_expires_at)
flows = Map.put(socket.assigns.flows, tuple, flow_map)
socket = assign(socket, flows: flows)
{:noreply, socket}
@@ -365,6 +404,7 @@ defmodule API.Gateway.Channel do
%{
client: client,
resource: resource,
flow_id: flow_id,
authorization_expires_at: authorization_expires_at,
client_payload: payload,
client_preshared_key: preshared_key
@@ -391,7 +431,13 @@ defmodule API.Gateway.Channel do
})
# Start tracking the flow
flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at)
tuple = {client.id, resource.id}
flow_map =
Map.get(socket.assigns.flows, tuple, %{})
|> Map.put(flow_id, authorization_expires_at)
flows = Map.put(socket.assigns.flows, tuple, flow_map)
socket = assign(socket, flows: flows)
{:noreply, socket}
@@ -585,7 +631,21 @@ defmodule API.Gateway.Channel do
} do
flows =
Flows.all_gateway_flows_for_cache!(socket.assigns.gateway)
|> Map.new()
# Reduces [ {client_id, resource_id}, {flow_id, inserted_at} ]
#
# to %{ {client_id, resource_id} => %{flow_id => expires_at} }
#
# This data structure is used to efficiently:
# 1. Check if there are any active flows remaining for this client/resource?
# 2. Remove a deleted flow
|> Enum.reduce(%{}, fn {{client_id, resource_id}, {flow_id, inserted_at}}, acc ->
# Assume all flows have a 14 day expiration if they still exist
expires_at = DateTime.add(inserted_at, 14, :day)
flow_id_map = Map.get(acc, {client_id, resource_id}, %{})
Map.put(acc, {client_id, resource_id}, Map.put(flow_id_map, flow_id, expires_at))
end)
assign(socket, flows: flows)
end

View File

@@ -1,7 +1,7 @@
defmodule API.Gateway.Views.Flow do
def render_many(flows) do
flows
|> Enum.map(fn {{client_id, resource_id}, _expires_at} ->
|> Enum.map(fn {{client_id, resource_id}, _flow_map} ->
%{
client_id: client_id,
resource_id: resource_id

View File

@@ -145,11 +145,19 @@ defmodule API.Gateway.ChannelTest do
test "pushes allow_access message", %{
client: client,
account: account,
gateway: gateway,
resource: resource,
relay: relay,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
@@ -164,6 +172,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
@@ -206,6 +215,13 @@ defmodule API.Gateway.ChannelTest do
connections: [%{gateway_group_id: internet_gateway_group.id}]
)
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
@@ -220,6 +236,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
@@ -239,7 +256,81 @@ defmodule API.Gateway.ChannelTest do
assert DateTime.from_unix!(payload.expires_at) == DateTime.truncate(expires_at, :second)
end
test "handles flow deletion event", %{
test "does not send reject_access if another flow is granting access to the same client and resource",
%{
account: account,
client: client,
resource: resource,
gateway: gateway,
relay: relay,
socket: socket,
subject: subject
} do
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
client_payload = "RTC_SD_or_DNS_Q"
stamp_secret = Ecto.UUID.generate()
:ok = Domain.Relays.connect_relay(relay, stamp_secret)
flow1 =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client,
resource: resource
)
flow2 =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client,
resource: resource
)
data = %{
"id" => flow1.id,
"client_id" => client.id,
"resource_id" => resource.id,
"account_id" => account.id
}
send(
socket.channel_pid,
{{:allow_access, gateway.id}, {channel_pid, socket_ref},
%{
client: client,
resource: resource,
flow_id: flow1.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
)
assert_push "allow_access", %{}
send(
socket.channel_pid,
{{:allow_access, gateway.id}, {channel_pid, socket_ref},
%{
client: client,
resource: resource,
flow_id: flow2.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
)
assert_push "allow_access", %{}
Events.Hooks.Flows.on_delete(data)
refute_push "reject_access", %{}
end
test "handles flow deletion event when access is removed", %{
account: account,
client: client,
resource: resource,
@@ -261,14 +352,16 @@ defmodule API.Gateway.ChannelTest do
account: account,
subject: subject,
client: client,
resource: resource
resource: resource,
gateway: gateway
)
data = %{
"id" => flow.id,
"client_id" => client.id,
"resource_id" => resource.id,
"account_id" => account.id
"account_id" => account.id,
"gateway_id" => gateway.id
}
send(
@@ -277,6 +370,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
@@ -313,6 +407,15 @@ defmodule API.Gateway.ChannelTest do
stamp_secret = Ecto.UUID.generate()
:ok = Domain.Relays.connect_relay(relay, stamp_secret)
flow =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client,
resource: resource,
gateway: gateway
)
other_client = Fixtures.Clients.create_client(account: account, subject: subject)
other_resource =
@@ -326,7 +429,8 @@ defmodule API.Gateway.ChannelTest do
account: account,
subject: subject,
client: other_client,
resource: resource
resource: resource,
gateway: gateway
)
other_flow2 =
@@ -334,7 +438,8 @@ defmodule API.Gateway.ChannelTest do
account: account,
subject: subject,
client: client,
resource: other_resource
resource: other_resource,
gateway: gateway
)
# Build up flow cache
@@ -344,6 +449,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
@@ -357,6 +463,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: other_client,
resource: resource,
flow_id: other_flow1.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
@@ -370,6 +477,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: other_resource,
flow_id: other_flow2.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
@@ -381,16 +489,17 @@ defmodule API.Gateway.ChannelTest do
:sys.get_state(socket.channel_pid)
assert flows == %{
{client.id, resource.id} => expires_at,
{other_client.id, resource.id} => expires_at,
{client.id, other_resource.id} => expires_at
{client.id, resource.id} => %{flow.id => expires_at},
{other_client.id, resource.id} => %{other_flow1.id => expires_at},
{client.id, other_resource.id} => %{other_flow2.id => expires_at}
}
data = %{
"id" => other_flow1.id,
"client_id" => other_flow1.client_id,
"resource_id" => other_flow1.resource_id,
"account_id" => other_flow1.account_id
"account_id" => other_flow1.account_id,
"gateway_id" => other_flow1.gateway_id
}
Events.Hooks.Flows.on_delete(data)
@@ -407,7 +516,8 @@ defmodule API.Gateway.ChannelTest do
"id" => other_flow2.id,
"client_id" => other_flow2.client_id,
"resource_id" => other_flow2.resource_id,
"account_id" => other_flow2.account_id
"account_id" => other_flow2.account_id,
"gateway_id" => other_flow2.gateway_id
}
Events.Hooks.Flows.on_delete(data)
@@ -425,11 +535,19 @@ defmodule API.Gateway.ChannelTest do
test "ignores other resource updates", %{
client: client,
account: account,
gateway: gateway,
resource: resource,
relay: relay,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
@@ -443,6 +561,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
@@ -462,10 +581,11 @@ defmodule API.Gateway.ChannelTest do
client_id = client.id
resource_id = resource.id
flow_id = flow.id
assert %{
assigns: %{
flows: %{{^client_id, ^resource_id} => ^expires_at}
flows: %{{^client_id, ^resource_id} => %{^flow_id => ^expires_at}}
}
} = :sys.get_state(socket.channel_pid)
@@ -475,10 +595,18 @@ defmodule API.Gateway.ChannelTest do
test "sends resource_updated when filters change", %{
client: client,
gateway: gateway,
account: account,
resource: resource,
relay: relay,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
@@ -493,6 +621,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload
}}
@@ -700,11 +829,19 @@ defmodule API.Gateway.ChannelTest do
test "pushes request_connection message", %{
client: client,
account: account,
resource: resource,
gateway: gateway,
global_relay: relay,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
@@ -720,6 +857,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload,
client_preshared_key: preshared_key
@@ -782,7 +920,8 @@ defmodule API.Gateway.ChannelTest do
account: account,
subject: subject,
client: client,
resource: resource
resource: resource,
gateway: gateway
)
send(
@@ -791,6 +930,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: client_payload,
client_preshared_key: preshared_key
@@ -803,7 +943,8 @@ defmodule API.Gateway.ChannelTest do
"id" => flow.id,
"client_id" => client.id,
"resource_id" => resource.id,
"account_id" => account.id
"account_id" => account.id,
"gateway_id" => gateway.id
}
Events.Hooks.Flows.on_delete(data)
@@ -819,10 +960,18 @@ defmodule API.Gateway.ChannelTest do
test "pushes authorize_flow message", %{
client: client,
account: account,
gateway: gateway,
resource: resource,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
@@ -839,6 +988,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
ice_credentials: ice_credentials,
preshared_key: preshared_key
@@ -881,8 +1031,16 @@ defmodule API.Gateway.ChannelTest do
client: client,
gateway: gateway,
resource: resource,
account: account,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
preshared_key = "PSK"
@@ -898,6 +1056,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: nil,
ice_credentials: ice_credentials,
preshared_key: preshared_key
@@ -930,7 +1089,8 @@ defmodule API.Gateway.ChannelTest do
account: account,
subject: subject,
client: client,
resource: resource
resource: resource,
gateway: gateway
)
send(
@@ -939,6 +1099,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
ice_credentials: ice_credentials,
preshared_key: preshared_key
@@ -951,7 +1112,8 @@ defmodule API.Gateway.ChannelTest do
"id" => flow.id,
"client_id" => client.id,
"resource_id" => resource.id,
"account_id" => account.id
"account_id" => account.id,
"gateway_id" => gateway.id
}
Events.Hooks.Flows.on_delete(data)
@@ -974,10 +1136,18 @@ defmodule API.Gateway.ChannelTest do
test "flow_authorized forwards reply to the client channel", %{
client: client,
account: account,
resource: resource,
gateway: gateway,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
@@ -1000,6 +1170,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
ice_credentials: ice_credentials,
preshared_key: preshared_key
@@ -1038,11 +1209,19 @@ defmodule API.Gateway.ChannelTest do
test "connection ready forwards RFC session description to the client channel", %{
client: client,
account: account,
resource: resource,
relay: relay,
gateway: gateway,
socket: socket
} do
flow =
Fixtures.Flows.create_flow(
account: account,
client: client,
resource: resource
)
channel_pid = self()
socket_ref = make_ref()
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
@@ -1059,6 +1238,7 @@ defmodule API.Gateway.ChannelTest do
%{
client: client,
resource: resource,
flow_id: flow.id,
authorization_expires_at: expires_at,
client_payload: payload,
client_preshared_key: preshared_key

View File

@@ -26,10 +26,9 @@ defmodule Domain.Flows.Flow.Query do
cutoff = DateTime.utc_now() |> DateTime.add(-14, :day)
where(queryable, [flows: flows], flows.inserted_at > ^cutoff)
|> group_by([flows: flows], [flows.client_id, flows.resource_id])
|> select(
[flows: flows],
{{flows.client_id, flows.resource_id}, max(flows.inserted_at)}
{{flows.client_id, flows.resource_id}, {flows.id, flows.inserted_at}}
)
end

View File

@@ -420,7 +420,7 @@ defmodule Domain.FlowsTest do
end
describe "all_gateway_flows_for_cache!/1" do
test "returns the later of two client_id/resource_id pair", %{
test "returns all flows for client_id/resource_id pair", %{
account: account,
client: client,
gateway: gateway,
@@ -456,32 +456,10 @@ defmodule Domain.FlowsTest do
assert DateTime.compare(flow2.inserted_at, flow1.inserted_at) == :gt
assert [{{flow2.client_id, flow2.resource_id}, flow2.inserted_at}] ==
Flows.all_gateway_flows_for_cache!(gateway)
end
flows = all_gateway_flows_for_cache!(gateway)
test "returns flow when only one unique one exists", %{
account: account,
client: client,
gateway: gateway,
membership: membership,
resource: resource,
policy: policy,
subject: subject
} do
flow =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client,
actor_group_membership: membership,
policy: policy,
resource: resource,
gateway: gateway
)
assert [{{flow.client_id, flow.resource_id}, flow.inserted_at}] ==
Flows.all_gateway_flows_for_cache!(gateway)
assert {{flow1.client_id, flow1.resource_id}, {flow1.id, flow1.inserted_at}} in flows
assert {{flow2.client_id, flow2.resource_id}, {flow2.id, flow2.inserted_at}} in flows
end
end