diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index a2bc1da71..ee9264c83 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -63,11 +63,14 @@ defmodule API.Client.Channel do # Called immediately after the client joins the channel def handle_info(:after_join, socket) do - # Initialize the cache + # Initialize the cache. Flows contains active flows for this client, so we + # can toggle the affected resource if the active flow is deleted. That will allow + # the client to create a new flow if the resource is still authorized. socket = socket |> hydrate_policies_and_resources() |> hydrate_membership_group_ids() + |> assign(flows: MapSet.new()) # Initialize relays {:ok, relays} = select_relays(socket) @@ -243,20 +246,34 @@ defmodule API.Client.Channel do # FLOWS - # If we're authorized for a resource for this flow, we need to push resource_deleted - # followed by resource_created_or_updated in order to reset the access state on the - # client. - def handle_info({:deleted, %Flows.Flow{} = flow}, socket) do - # 1. Check if this possibly affects us - if resource = socket.assigns.resources[flow.resource_id] do - # 1. If so, check if we're currently authorized for this resource - if resource.id in Enum.map(authorized_resources(socket), & &1.id) do - push(socket, "resource_deleted", resource.id) - push(socket, "resource_created_or_updated", Views.Resource.render(resource)) - end - end + def handle_info( + {:deleted, %Flows.Flow{client_id: client_id} = flow}, + %{assigns: %{client: %{id: id}}} = socket + ) + when client_id == id do + if MapSet.member?(socket.assigns.flows, flow.id) do + # If an active flow is deleted, we need to recreate it. + # To do that, we need to flap the resource on the client because it doesn't track flows. + # The gateway is also tracking flows and will have sent a reject_access for this client/resource + # if this was the last flow in its cache that was authorizing it. + push(socket, "resource_deleted", flow.resource_id) - {:noreply, socket} + resource = Map.get(socket.assigns.resources, flow.resource_id) + + # Access to resource is still allowed, allow creating a new flow + if not is_nil(resource) and resource.id in Enum.map(authorized_resources(socket), & &1.id) do + push(socket, "resource_created_or_updated", Views.Resource.render(resource)) + else + Logger.warning("Active flow deleted for resource but resource not found in socket state", + resource_id: flow.resource_id, + flow_id: flow.id + ) + end + + {:noreply, assign(socket, flows: MapSet.delete(socket.assigns.flows, flow.id))} + else + {:noreply, socket} + end end # GATEWAY_GROUPS @@ -708,7 +725,7 @@ defmodule API.Client.Channel do # TODO: Optimization # Move this to a Task.start that completes after broadcasting authorize_flow - {:ok, _flow} = + {:ok, flow} = Flows.create_flow( socket.assigns.client, gateway, @@ -727,12 +744,15 @@ defmodule API.Client.Channel do %{ client: socket.assigns.client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key }} ) + socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) + {:noreply, socket} else {:error, :not_found} -> @@ -860,7 +880,7 @@ defmodule API.Client.Channel do {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do # TODO: Optimization - {:ok, _flow} = + {:ok, flow} = Flows.create_flow( socket.assigns.client, gateway, @@ -876,11 +896,14 @@ defmodule API.Client.Channel do %{ client: socket.assigns.client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: payload }} ) + socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) + {:noreply, socket} else {:error, :not_found} -> @@ -917,7 +940,7 @@ defmodule API.Client.Channel do {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do # TODO: Optimization - {:ok, _flow} = + {:ok, flow} = Flows.create_flow( socket.assigns.client, gateway, @@ -933,12 +956,15 @@ defmodule API.Client.Channel do %{ client: socket.assigns.client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload, client_preshared_key: preshared_key }} ) + socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) + {:noreply, socket} else {:error, :not_found} -> diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index 2ec1bf619..f6d2eac4a 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -55,11 +55,23 @@ defmodule API.Gateway.Channel do now = DateTime.utc_now() - # Reject flows older than 14 days + # 1. Remove individual flows older than 14 days, then remove access entry if no flows left flows = socket.assigns.flows - |> Enum.reject(fn {_tuple, expires_at} -> DateTime.compare(expires_at, now) == :lt end) + |> Enum.map(fn {tuple, flow_id_map} -> + flow_id_map = + Enum.reject(flow_id_map, fn {_flow_id, expires_at} -> + DateTime.compare(expires_at, now) == :lt + end) + |> Enum.into(%{}) + + {tuple, flow_id_map} + end) |> Enum.into(%{}) + |> Enum.reject(fn {_tuple, flow_id_map} -> map_size(flow_id_map) == 0 end) + |> Enum.into(%{}) + + # The gateway has its own flow expiration, so no need to send `reject_access` {:noreply, assign(socket, flows: flows)} end @@ -89,17 +101,30 @@ defmodule API.Gateway.Channel do # FLOWS - def handle_info({:deleted, %Flows.Flow{} = flow}, socket) do + def handle_info( + {:deleted, %Flows.Flow{gateway_id: gateway_id} = flow}, + %{ + assigns: %{gateway: %{id: id}} + } = socket + ) + when gateway_id == id do tuple = {flow.client_id, flow.resource_id} socket = - if Map.has_key?(socket.assigns.flows, tuple) do - push(socket, "reject_access", %{ - client_id: flow.client_id, - resource_id: flow.resource_id - }) + if flow_map = Map.get(socket.assigns.flows, tuple) do + flow_map = Map.delete(flow_map, flow.id) - assign(socket, flows: Map.delete(socket.assigns.flows, tuple)) + if map_size(flow_map) == 0 do + # Send reject_access if this was the last flow granting access for this client/resource + push(socket, "reject_access", %{ + client_id: flow.client_id, + resource_id: flow.resource_id + }) + + assign(socket, flows: Map.delete(socket.assigns.flows, tuple)) + else + assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map)) + end else socket end @@ -275,6 +300,7 @@ defmodule API.Gateway.Channel do %{ client: client, resource: resource, + flow_id: flow_id, authorization_expires_at: authorization_expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -303,7 +329,13 @@ defmodule API.Gateway.Channel do }) # Start tracking flow - flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at) + tuple = {client.id, resource.id} + + flow_map = + Map.get(socket.assigns.flows, tuple, %{}) + |> Map.put(flow_id, authorization_expires_at) + + flows = Map.put(socket.assigns.flows, tuple, flow_map) socket = assign(socket, flows: flows) {:noreply, socket} @@ -318,6 +350,7 @@ defmodule API.Gateway.Channel do %{ client: client, resource: resource, + flow_id: flow_id, authorization_expires_at: authorization_expires_at, client_payload: payload } = attrs @@ -346,7 +379,13 @@ defmodule API.Gateway.Channel do }) # Start tracking the flow - flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at) + tuple = {client.id, resource.id} + + flow_map = + Map.get(socket.assigns.flows, tuple, %{}) + |> Map.put(flow_id, authorization_expires_at) + + flows = Map.put(socket.assigns.flows, tuple, flow_map) socket = assign(socket, flows: flows) {:noreply, socket} @@ -365,6 +404,7 @@ defmodule API.Gateway.Channel do %{ client: client, resource: resource, + flow_id: flow_id, authorization_expires_at: authorization_expires_at, client_payload: payload, client_preshared_key: preshared_key @@ -391,7 +431,13 @@ defmodule API.Gateway.Channel do }) # Start tracking the flow - flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at) + tuple = {client.id, resource.id} + + flow_map = + Map.get(socket.assigns.flows, tuple, %{}) + |> Map.put(flow_id, authorization_expires_at) + + flows = Map.put(socket.assigns.flows, tuple, flow_map) socket = assign(socket, flows: flows) {:noreply, socket} @@ -585,7 +631,21 @@ defmodule API.Gateway.Channel do } do flows = Flows.all_gateway_flows_for_cache!(socket.assigns.gateway) - |> Map.new() + # Reduces [ {client_id, resource_id}, {flow_id, inserted_at} ] + # + # to %{ {client_id, resource_id} => %{flow_id => expires_at} } + # + # This data structure is used to efficiently: + # 1. Check if there are any active flows remaining for this client/resource? + # 2. Remove a deleted flow + |> Enum.reduce(%{}, fn {{client_id, resource_id}, {flow_id, inserted_at}}, acc -> + # Assume all flows have a 14 day expiration if they still exist + expires_at = DateTime.add(inserted_at, 14, :day) + + flow_id_map = Map.get(acc, {client_id, resource_id}, %{}) + + Map.put(acc, {client_id, resource_id}, Map.put(flow_id_map, flow_id, expires_at)) + end) assign(socket, flows: flows) end diff --git a/elixir/apps/api/lib/api/gateway/views/flow.ex b/elixir/apps/api/lib/api/gateway/views/flow.ex index 4e614fd0e..03b776fac 100644 --- a/elixir/apps/api/lib/api/gateway/views/flow.ex +++ b/elixir/apps/api/lib/api/gateway/views/flow.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Views.Flow do def render_many(flows) do flows - |> Enum.map(fn {{client_id, resource_id}, _expires_at} -> + |> Enum.map(fn {{client_id, resource_id}, _flow_map} -> %{ client_id: client_id, resource_id: resource_id diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 43752368b..b0b7b4148 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -145,11 +145,19 @@ defmodule API.Gateway.ChannelTest do test "pushes allow_access message", %{ client: client, + account: account, gateway: gateway, resource: resource, relay: relay, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -164,6 +172,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -206,6 +215,13 @@ defmodule API.Gateway.ChannelTest do connections: [%{gateway_group_id: internet_gateway_group.id}] ) + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -220,6 +236,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -239,7 +256,81 @@ defmodule API.Gateway.ChannelTest do assert DateTime.from_unix!(payload.expires_at) == DateTime.truncate(expires_at, :second) end - test "handles flow deletion event", %{ + test "does not send reject_access if another flow is granting access to the same client and resource", + %{ + account: account, + client: client, + resource: resource, + gateway: gateway, + relay: relay, + socket: socket, + subject: subject + } do + channel_pid = self() + socket_ref = make_ref() + expires_at = DateTime.utc_now() |> DateTime.add(30, :second) + client_payload = "RTC_SD_or_DNS_Q" + + stamp_secret = Ecto.UUID.generate() + :ok = Domain.Relays.connect_relay(relay, stamp_secret) + + flow1 = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + resource: resource + ) + + flow2 = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + resource: resource + ) + + data = %{ + "id" => flow1.id, + "client_id" => client.id, + "resource_id" => resource.id, + "account_id" => account.id + } + + send( + socket.channel_pid, + {{:allow_access, gateway.id}, {channel_pid, socket_ref}, + %{ + client: client, + resource: resource, + flow_id: flow1.id, + authorization_expires_at: expires_at, + client_payload: client_payload + }} + ) + + assert_push "allow_access", %{} + + send( + socket.channel_pid, + {{:allow_access, gateway.id}, {channel_pid, socket_ref}, + %{ + client: client, + resource: resource, + flow_id: flow2.id, + authorization_expires_at: expires_at, + client_payload: client_payload + }} + ) + + assert_push "allow_access", %{} + + Events.Hooks.Flows.on_delete(data) + + refute_push "reject_access", %{} + end + + test "handles flow deletion event when access is removed", %{ account: account, client: client, resource: resource, @@ -261,14 +352,16 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: client, - resource: resource + resource: resource, + gateway: gateway ) data = %{ "id" => flow.id, "client_id" => client.id, "resource_id" => resource.id, - "account_id" => account.id + "account_id" => account.id, + "gateway_id" => gateway.id } send( @@ -277,6 +370,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -313,6 +407,15 @@ defmodule API.Gateway.ChannelTest do stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + resource: resource, + gateway: gateway + ) + other_client = Fixtures.Clients.create_client(account: account, subject: subject) other_resource = @@ -326,7 +429,8 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: other_client, - resource: resource + resource: resource, + gateway: gateway ) other_flow2 = @@ -334,7 +438,8 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: client, - resource: other_resource + resource: other_resource, + gateway: gateway ) # Build up flow cache @@ -344,6 +449,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -357,6 +463,7 @@ defmodule API.Gateway.ChannelTest do %{ client: other_client, resource: resource, + flow_id: other_flow1.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -370,6 +477,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: other_resource, + flow_id: other_flow2.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -381,16 +489,17 @@ defmodule API.Gateway.ChannelTest do :sys.get_state(socket.channel_pid) assert flows == %{ - {client.id, resource.id} => expires_at, - {other_client.id, resource.id} => expires_at, - {client.id, other_resource.id} => expires_at + {client.id, resource.id} => %{flow.id => expires_at}, + {other_client.id, resource.id} => %{other_flow1.id => expires_at}, + {client.id, other_resource.id} => %{other_flow2.id => expires_at} } data = %{ "id" => other_flow1.id, "client_id" => other_flow1.client_id, "resource_id" => other_flow1.resource_id, - "account_id" => other_flow1.account_id + "account_id" => other_flow1.account_id, + "gateway_id" => other_flow1.gateway_id } Events.Hooks.Flows.on_delete(data) @@ -407,7 +516,8 @@ defmodule API.Gateway.ChannelTest do "id" => other_flow2.id, "client_id" => other_flow2.client_id, "resource_id" => other_flow2.resource_id, - "account_id" => other_flow2.account_id + "account_id" => other_flow2.account_id, + "gateway_id" => other_flow2.gateway_id } Events.Hooks.Flows.on_delete(data) @@ -425,11 +535,19 @@ defmodule API.Gateway.ChannelTest do test "ignores other resource updates", %{ client: client, + account: account, gateway: gateway, resource: resource, relay: relay, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -443,6 +561,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -462,10 +581,11 @@ defmodule API.Gateway.ChannelTest do client_id = client.id resource_id = resource.id + flow_id = flow.id assert %{ assigns: %{ - flows: %{{^client_id, ^resource_id} => ^expires_at} + flows: %{{^client_id, ^resource_id} => %{^flow_id => ^expires_at}} } } = :sys.get_state(socket.channel_pid) @@ -475,10 +595,18 @@ defmodule API.Gateway.ChannelTest do test "sends resource_updated when filters change", %{ client: client, gateway: gateway, + account: account, resource: resource, relay: relay, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -493,6 +621,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -700,11 +829,19 @@ defmodule API.Gateway.ChannelTest do test "pushes request_connection message", %{ client: client, + account: account, resource: resource, gateway: gateway, global_relay: relay, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -720,6 +857,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload, client_preshared_key: preshared_key @@ -782,7 +920,8 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: client, - resource: resource + resource: resource, + gateway: gateway ) send( @@ -791,6 +930,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload, client_preshared_key: preshared_key @@ -803,7 +943,8 @@ defmodule API.Gateway.ChannelTest do "id" => flow.id, "client_id" => client.id, "resource_id" => resource.id, - "account_id" => account.id + "account_id" => account.id, + "gateway_id" => gateway.id } Events.Hooks.Flows.on_delete(data) @@ -819,10 +960,18 @@ defmodule API.Gateway.ChannelTest do test "pushes authorize_flow message", %{ client: client, + account: account, gateway: gateway, resource: resource, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -839,6 +988,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -881,8 +1031,16 @@ defmodule API.Gateway.ChannelTest do client: client, gateway: gateway, resource: resource, + account: account, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() preshared_key = "PSK" @@ -898,6 +1056,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: nil, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -930,7 +1089,8 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: client, - resource: resource + resource: resource, + gateway: gateway ) send( @@ -939,6 +1099,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -951,7 +1112,8 @@ defmodule API.Gateway.ChannelTest do "id" => flow.id, "client_id" => client.id, "resource_id" => resource.id, - "account_id" => account.id + "account_id" => account.id, + "gateway_id" => gateway.id } Events.Hooks.Flows.on_delete(data) @@ -974,10 +1136,18 @@ defmodule API.Gateway.ChannelTest do test "flow_authorized forwards reply to the client channel", %{ client: client, + account: account, resource: resource, gateway: gateway, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -1000,6 +1170,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -1038,11 +1209,19 @@ defmodule API.Gateway.ChannelTest do test "connection ready forwards RFC session description to the client channel", %{ client: client, + account: account, resource: resource, relay: relay, gateway: gateway, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -1059,6 +1238,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: payload, client_preshared_key: preshared_key diff --git a/elixir/apps/domain/lib/domain/flows/flow/query.ex b/elixir/apps/domain/lib/domain/flows/flow/query.ex index 3a60303bf..7edc048bd 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/query.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/query.ex @@ -26,10 +26,9 @@ defmodule Domain.Flows.Flow.Query do cutoff = DateTime.utc_now() |> DateTime.add(-14, :day) where(queryable, [flows: flows], flows.inserted_at > ^cutoff) - |> group_by([flows: flows], [flows.client_id, flows.resource_id]) |> select( [flows: flows], - {{flows.client_id, flows.resource_id}, max(flows.inserted_at)} + {{flows.client_id, flows.resource_id}, {flows.id, flows.inserted_at}} ) end diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index 52af61573..6b9a2e68d 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -420,7 +420,7 @@ defmodule Domain.FlowsTest do end describe "all_gateway_flows_for_cache!/1" do - test "returns the later of two client_id/resource_id pair", %{ + test "returns all flows for client_id/resource_id pair", %{ account: account, client: client, gateway: gateway, @@ -456,32 +456,10 @@ defmodule Domain.FlowsTest do assert DateTime.compare(flow2.inserted_at, flow1.inserted_at) == :gt - assert [{{flow2.client_id, flow2.resource_id}, flow2.inserted_at}] == - Flows.all_gateway_flows_for_cache!(gateway) - end + flows = all_gateway_flows_for_cache!(gateway) - test "returns flow when only one unique one exists", %{ - account: account, - client: client, - gateway: gateway, - membership: membership, - resource: resource, - policy: policy, - subject: subject - } do - flow = - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client, - actor_group_membership: membership, - policy: policy, - resource: resource, - gateway: gateway - ) - - assert [{{flow.client_id, flow.resource_id}, flow.inserted_at}] == - Flows.all_gateway_flows_for_cache!(gateway) + assert {{flow1.client_id, flow1.resource_id}, {flow1.id, flow1.inserted_at}} in flows + assert {{flow2.client_id, flow2.resource_id}, {flow2.id, flow2.inserted_at}} in flows end end