From 488cb96469bab18cefb6ccdb1871c1fdff091309 Mon Sep 17 00:00:00 2001 From: Jamil Date: Mon, 21 Jul 2025 16:12:05 -0400 Subject: [PATCH] fix(portal): don't prematurely reject access (#9952) Before: - When a flow was deleted, we flapped the resource on the client, and sent `reject_access` naively for the flow's `{client_id, resource_id}` pair on the gateway. This resulted in lots of unneeded resource flappage on the client whenever bulk flow deletions happened. After: - When a flow is deleted, we check if this is an active flow for the client. If so, we flap the resource then in order to trigger generation of a new flow. If access was truly affected, that results in a loss of a resource, we will push `resource_deleted` for the update that triggered the flow deletion (for example the resource/policy removal). On the gateway, we only send `reject_access` if it was the last flow granting access for a particular `client/resource` tuple. Why: - While the access state is still correct in the previous implementation, we run the possibility of pushing way too many resource flaps to the client in an overly eager attempt to remove access the client may not have access to. cc @thomaseizinger Related: https://firezonehq.slack.com/archives/C08FPHECLUF/p1753101115735179 --- elixir/apps/api/lib/api/client/channel.ex | 60 +++-- elixir/apps/api/lib/api/gateway/channel.ex | 86 +++++-- elixir/apps/api/lib/api/gateway/views/flow.ex | 2 +- .../api/test/api/gateway/channel_test.exs | 210 ++++++++++++++++-- .../domain/lib/domain/flows/flow/query.ex | 3 +- elixir/apps/domain/test/domain/flows_test.exs | 30 +-- 6 files changed, 317 insertions(+), 74 deletions(-) diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index a2bc1da71..ee9264c83 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -63,11 +63,14 @@ defmodule API.Client.Channel do # Called immediately after the client joins the channel def handle_info(:after_join, socket) do - # Initialize the cache + # Initialize the cache. Flows contains active flows for this client, so we + # can toggle the affected resource if the active flow is deleted. That will allow + # the client to create a new flow if the resource is still authorized. socket = socket |> hydrate_policies_and_resources() |> hydrate_membership_group_ids() + |> assign(flows: MapSet.new()) # Initialize relays {:ok, relays} = select_relays(socket) @@ -243,20 +246,34 @@ defmodule API.Client.Channel do # FLOWS - # If we're authorized for a resource for this flow, we need to push resource_deleted - # followed by resource_created_or_updated in order to reset the access state on the - # client. - def handle_info({:deleted, %Flows.Flow{} = flow}, socket) do - # 1. Check if this possibly affects us - if resource = socket.assigns.resources[flow.resource_id] do - # 1. If so, check if we're currently authorized for this resource - if resource.id in Enum.map(authorized_resources(socket), & &1.id) do - push(socket, "resource_deleted", resource.id) - push(socket, "resource_created_or_updated", Views.Resource.render(resource)) - end - end + def handle_info( + {:deleted, %Flows.Flow{client_id: client_id} = flow}, + %{assigns: %{client: %{id: id}}} = socket + ) + when client_id == id do + if MapSet.member?(socket.assigns.flows, flow.id) do + # If an active flow is deleted, we need to recreate it. + # To do that, we need to flap the resource on the client because it doesn't track flows. + # The gateway is also tracking flows and will have sent a reject_access for this client/resource + # if this was the last flow in its cache that was authorizing it. + push(socket, "resource_deleted", flow.resource_id) - {:noreply, socket} + resource = Map.get(socket.assigns.resources, flow.resource_id) + + # Access to resource is still allowed, allow creating a new flow + if not is_nil(resource) and resource.id in Enum.map(authorized_resources(socket), & &1.id) do + push(socket, "resource_created_or_updated", Views.Resource.render(resource)) + else + Logger.warning("Active flow deleted for resource but resource not found in socket state", + resource_id: flow.resource_id, + flow_id: flow.id + ) + end + + {:noreply, assign(socket, flows: MapSet.delete(socket.assigns.flows, flow.id))} + else + {:noreply, socket} + end end # GATEWAY_GROUPS @@ -708,7 +725,7 @@ defmodule API.Client.Channel do # TODO: Optimization # Move this to a Task.start that completes after broadcasting authorize_flow - {:ok, _flow} = + {:ok, flow} = Flows.create_flow( socket.assigns.client, gateway, @@ -727,12 +744,15 @@ defmodule API.Client.Channel do %{ client: socket.assigns.client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key }} ) + socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) + {:noreply, socket} else {:error, :not_found} -> @@ -860,7 +880,7 @@ defmodule API.Client.Channel do {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do # TODO: Optimization - {:ok, _flow} = + {:ok, flow} = Flows.create_flow( socket.assigns.client, gateway, @@ -876,11 +896,14 @@ defmodule API.Client.Channel do %{ client: socket.assigns.client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: payload }} ) + socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) + {:noreply, socket} else {:error, :not_found} -> @@ -917,7 +940,7 @@ defmodule API.Client.Channel do {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do # TODO: Optimization - {:ok, _flow} = + {:ok, flow} = Flows.create_flow( socket.assigns.client, gateway, @@ -933,12 +956,15 @@ defmodule API.Client.Channel do %{ client: socket.assigns.client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload, client_preshared_key: preshared_key }} ) + socket = assign(socket, flows: MapSet.put(socket.assigns.flows, flow.id)) + {:noreply, socket} else {:error, :not_found} -> diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index 2ec1bf619..f6d2eac4a 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -55,11 +55,23 @@ defmodule API.Gateway.Channel do now = DateTime.utc_now() - # Reject flows older than 14 days + # 1. Remove individual flows older than 14 days, then remove access entry if no flows left flows = socket.assigns.flows - |> Enum.reject(fn {_tuple, expires_at} -> DateTime.compare(expires_at, now) == :lt end) + |> Enum.map(fn {tuple, flow_id_map} -> + flow_id_map = + Enum.reject(flow_id_map, fn {_flow_id, expires_at} -> + DateTime.compare(expires_at, now) == :lt + end) + |> Enum.into(%{}) + + {tuple, flow_id_map} + end) |> Enum.into(%{}) + |> Enum.reject(fn {_tuple, flow_id_map} -> map_size(flow_id_map) == 0 end) + |> Enum.into(%{}) + + # The gateway has its own flow expiration, so no need to send `reject_access` {:noreply, assign(socket, flows: flows)} end @@ -89,17 +101,30 @@ defmodule API.Gateway.Channel do # FLOWS - def handle_info({:deleted, %Flows.Flow{} = flow}, socket) do + def handle_info( + {:deleted, %Flows.Flow{gateway_id: gateway_id} = flow}, + %{ + assigns: %{gateway: %{id: id}} + } = socket + ) + when gateway_id == id do tuple = {flow.client_id, flow.resource_id} socket = - if Map.has_key?(socket.assigns.flows, tuple) do - push(socket, "reject_access", %{ - client_id: flow.client_id, - resource_id: flow.resource_id - }) + if flow_map = Map.get(socket.assigns.flows, tuple) do + flow_map = Map.delete(flow_map, flow.id) - assign(socket, flows: Map.delete(socket.assigns.flows, tuple)) + if map_size(flow_map) == 0 do + # Send reject_access if this was the last flow granting access for this client/resource + push(socket, "reject_access", %{ + client_id: flow.client_id, + resource_id: flow.resource_id + }) + + assign(socket, flows: Map.delete(socket.assigns.flows, tuple)) + else + assign(socket, flows: Map.put(socket.assigns.flows, tuple, flow_map)) + end else socket end @@ -275,6 +300,7 @@ defmodule API.Gateway.Channel do %{ client: client, resource: resource, + flow_id: flow_id, authorization_expires_at: authorization_expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -303,7 +329,13 @@ defmodule API.Gateway.Channel do }) # Start tracking flow - flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at) + tuple = {client.id, resource.id} + + flow_map = + Map.get(socket.assigns.flows, tuple, %{}) + |> Map.put(flow_id, authorization_expires_at) + + flows = Map.put(socket.assigns.flows, tuple, flow_map) socket = assign(socket, flows: flows) {:noreply, socket} @@ -318,6 +350,7 @@ defmodule API.Gateway.Channel do %{ client: client, resource: resource, + flow_id: flow_id, authorization_expires_at: authorization_expires_at, client_payload: payload } = attrs @@ -346,7 +379,13 @@ defmodule API.Gateway.Channel do }) # Start tracking the flow - flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at) + tuple = {client.id, resource.id} + + flow_map = + Map.get(socket.assigns.flows, tuple, %{}) + |> Map.put(flow_id, authorization_expires_at) + + flows = Map.put(socket.assigns.flows, tuple, flow_map) socket = assign(socket, flows: flows) {:noreply, socket} @@ -365,6 +404,7 @@ defmodule API.Gateway.Channel do %{ client: client, resource: resource, + flow_id: flow_id, authorization_expires_at: authorization_expires_at, client_payload: payload, client_preshared_key: preshared_key @@ -391,7 +431,13 @@ defmodule API.Gateway.Channel do }) # Start tracking the flow - flows = Map.put(socket.assigns.flows, {client.id, resource.id}, authorization_expires_at) + tuple = {client.id, resource.id} + + flow_map = + Map.get(socket.assigns.flows, tuple, %{}) + |> Map.put(flow_id, authorization_expires_at) + + flows = Map.put(socket.assigns.flows, tuple, flow_map) socket = assign(socket, flows: flows) {:noreply, socket} @@ -585,7 +631,21 @@ defmodule API.Gateway.Channel do } do flows = Flows.all_gateway_flows_for_cache!(socket.assigns.gateway) - |> Map.new() + # Reduces [ {client_id, resource_id}, {flow_id, inserted_at} ] + # + # to %{ {client_id, resource_id} => %{flow_id => expires_at} } + # + # This data structure is used to efficiently: + # 1. Check if there are any active flows remaining for this client/resource? + # 2. Remove a deleted flow + |> Enum.reduce(%{}, fn {{client_id, resource_id}, {flow_id, inserted_at}}, acc -> + # Assume all flows have a 14 day expiration if they still exist + expires_at = DateTime.add(inserted_at, 14, :day) + + flow_id_map = Map.get(acc, {client_id, resource_id}, %{}) + + Map.put(acc, {client_id, resource_id}, Map.put(flow_id_map, flow_id, expires_at)) + end) assign(socket, flows: flows) end diff --git a/elixir/apps/api/lib/api/gateway/views/flow.ex b/elixir/apps/api/lib/api/gateway/views/flow.ex index 4e614fd0e..03b776fac 100644 --- a/elixir/apps/api/lib/api/gateway/views/flow.ex +++ b/elixir/apps/api/lib/api/gateway/views/flow.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Views.Flow do def render_many(flows) do flows - |> Enum.map(fn {{client_id, resource_id}, _expires_at} -> + |> Enum.map(fn {{client_id, resource_id}, _flow_map} -> %{ client_id: client_id, resource_id: resource_id diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 43752368b..b0b7b4148 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -145,11 +145,19 @@ defmodule API.Gateway.ChannelTest do test "pushes allow_access message", %{ client: client, + account: account, gateway: gateway, resource: resource, relay: relay, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -164,6 +172,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -206,6 +215,13 @@ defmodule API.Gateway.ChannelTest do connections: [%{gateway_group_id: internet_gateway_group.id}] ) + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -220,6 +236,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -239,7 +256,81 @@ defmodule API.Gateway.ChannelTest do assert DateTime.from_unix!(payload.expires_at) == DateTime.truncate(expires_at, :second) end - test "handles flow deletion event", %{ + test "does not send reject_access if another flow is granting access to the same client and resource", + %{ + account: account, + client: client, + resource: resource, + gateway: gateway, + relay: relay, + socket: socket, + subject: subject + } do + channel_pid = self() + socket_ref = make_ref() + expires_at = DateTime.utc_now() |> DateTime.add(30, :second) + client_payload = "RTC_SD_or_DNS_Q" + + stamp_secret = Ecto.UUID.generate() + :ok = Domain.Relays.connect_relay(relay, stamp_secret) + + flow1 = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + resource: resource + ) + + flow2 = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + resource: resource + ) + + data = %{ + "id" => flow1.id, + "client_id" => client.id, + "resource_id" => resource.id, + "account_id" => account.id + } + + send( + socket.channel_pid, + {{:allow_access, gateway.id}, {channel_pid, socket_ref}, + %{ + client: client, + resource: resource, + flow_id: flow1.id, + authorization_expires_at: expires_at, + client_payload: client_payload + }} + ) + + assert_push "allow_access", %{} + + send( + socket.channel_pid, + {{:allow_access, gateway.id}, {channel_pid, socket_ref}, + %{ + client: client, + resource: resource, + flow_id: flow2.id, + authorization_expires_at: expires_at, + client_payload: client_payload + }} + ) + + assert_push "allow_access", %{} + + Events.Hooks.Flows.on_delete(data) + + refute_push "reject_access", %{} + end + + test "handles flow deletion event when access is removed", %{ account: account, client: client, resource: resource, @@ -261,14 +352,16 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: client, - resource: resource + resource: resource, + gateway: gateway ) data = %{ "id" => flow.id, "client_id" => client.id, "resource_id" => resource.id, - "account_id" => account.id + "account_id" => account.id, + "gateway_id" => gateway.id } send( @@ -277,6 +370,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -313,6 +407,15 @@ defmodule API.Gateway.ChannelTest do stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + resource: resource, + gateway: gateway + ) + other_client = Fixtures.Clients.create_client(account: account, subject: subject) other_resource = @@ -326,7 +429,8 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: other_client, - resource: resource + resource: resource, + gateway: gateway ) other_flow2 = @@ -334,7 +438,8 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: client, - resource: other_resource + resource: other_resource, + gateway: gateway ) # Build up flow cache @@ -344,6 +449,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -357,6 +463,7 @@ defmodule API.Gateway.ChannelTest do %{ client: other_client, resource: resource, + flow_id: other_flow1.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -370,6 +477,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: other_resource, + flow_id: other_flow2.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -381,16 +489,17 @@ defmodule API.Gateway.ChannelTest do :sys.get_state(socket.channel_pid) assert flows == %{ - {client.id, resource.id} => expires_at, - {other_client.id, resource.id} => expires_at, - {client.id, other_resource.id} => expires_at + {client.id, resource.id} => %{flow.id => expires_at}, + {other_client.id, resource.id} => %{other_flow1.id => expires_at}, + {client.id, other_resource.id} => %{other_flow2.id => expires_at} } data = %{ "id" => other_flow1.id, "client_id" => other_flow1.client_id, "resource_id" => other_flow1.resource_id, - "account_id" => other_flow1.account_id + "account_id" => other_flow1.account_id, + "gateway_id" => other_flow1.gateway_id } Events.Hooks.Flows.on_delete(data) @@ -407,7 +516,8 @@ defmodule API.Gateway.ChannelTest do "id" => other_flow2.id, "client_id" => other_flow2.client_id, "resource_id" => other_flow2.resource_id, - "account_id" => other_flow2.account_id + "account_id" => other_flow2.account_id, + "gateway_id" => other_flow2.gateway_id } Events.Hooks.Flows.on_delete(data) @@ -425,11 +535,19 @@ defmodule API.Gateway.ChannelTest do test "ignores other resource updates", %{ client: client, + account: account, gateway: gateway, resource: resource, relay: relay, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -443,6 +561,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -462,10 +581,11 @@ defmodule API.Gateway.ChannelTest do client_id = client.id resource_id = resource.id + flow_id = flow.id assert %{ assigns: %{ - flows: %{{^client_id, ^resource_id} => ^expires_at} + flows: %{{^client_id, ^resource_id} => %{^flow_id => ^expires_at}} } } = :sys.get_state(socket.channel_pid) @@ -475,10 +595,18 @@ defmodule API.Gateway.ChannelTest do test "sends resource_updated when filters change", %{ client: client, gateway: gateway, + account: account, resource: resource, relay: relay, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -493,6 +621,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload }} @@ -700,11 +829,19 @@ defmodule API.Gateway.ChannelTest do test "pushes request_connection message", %{ client: client, + account: account, resource: resource, gateway: gateway, global_relay: relay, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -720,6 +857,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload, client_preshared_key: preshared_key @@ -782,7 +920,8 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: client, - resource: resource + resource: resource, + gateway: gateway ) send( @@ -791,6 +930,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: client_payload, client_preshared_key: preshared_key @@ -803,7 +943,8 @@ defmodule API.Gateway.ChannelTest do "id" => flow.id, "client_id" => client.id, "resource_id" => resource.id, - "account_id" => account.id + "account_id" => account.id, + "gateway_id" => gateway.id } Events.Hooks.Flows.on_delete(data) @@ -819,10 +960,18 @@ defmodule API.Gateway.ChannelTest do test "pushes authorize_flow message", %{ client: client, + account: account, gateway: gateway, resource: resource, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -839,6 +988,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -881,8 +1031,16 @@ defmodule API.Gateway.ChannelTest do client: client, gateway: gateway, resource: resource, + account: account, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() preshared_key = "PSK" @@ -898,6 +1056,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: nil, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -930,7 +1089,8 @@ defmodule API.Gateway.ChannelTest do account: account, subject: subject, client: client, - resource: resource + resource: resource, + gateway: gateway ) send( @@ -939,6 +1099,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -951,7 +1112,8 @@ defmodule API.Gateway.ChannelTest do "id" => flow.id, "client_id" => client.id, "resource_id" => resource.id, - "account_id" => account.id + "account_id" => account.id, + "gateway_id" => gateway.id } Events.Hooks.Flows.on_delete(data) @@ -974,10 +1136,18 @@ defmodule API.Gateway.ChannelTest do test "flow_authorized forwards reply to the client channel", %{ client: client, + account: account, resource: resource, gateway: gateway, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -1000,6 +1170,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key @@ -1038,11 +1209,19 @@ defmodule API.Gateway.ChannelTest do test "connection ready forwards RFC session description to the client channel", %{ client: client, + account: account, resource: resource, relay: relay, gateway: gateway, socket: socket } do + flow = + Fixtures.Flows.create_flow( + account: account, + client: client, + resource: resource + ) + channel_pid = self() socket_ref = make_ref() expires_at = DateTime.utc_now() |> DateTime.add(30, :second) @@ -1059,6 +1238,7 @@ defmodule API.Gateway.ChannelTest do %{ client: client, resource: resource, + flow_id: flow.id, authorization_expires_at: expires_at, client_payload: payload, client_preshared_key: preshared_key diff --git a/elixir/apps/domain/lib/domain/flows/flow/query.ex b/elixir/apps/domain/lib/domain/flows/flow/query.ex index 3a60303bf..7edc048bd 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/query.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/query.ex @@ -26,10 +26,9 @@ defmodule Domain.Flows.Flow.Query do cutoff = DateTime.utc_now() |> DateTime.add(-14, :day) where(queryable, [flows: flows], flows.inserted_at > ^cutoff) - |> group_by([flows: flows], [flows.client_id, flows.resource_id]) |> select( [flows: flows], - {{flows.client_id, flows.resource_id}, max(flows.inserted_at)} + {{flows.client_id, flows.resource_id}, {flows.id, flows.inserted_at}} ) end diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index 52af61573..6b9a2e68d 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -420,7 +420,7 @@ defmodule Domain.FlowsTest do end describe "all_gateway_flows_for_cache!/1" do - test "returns the later of two client_id/resource_id pair", %{ + test "returns all flows for client_id/resource_id pair", %{ account: account, client: client, gateway: gateway, @@ -456,32 +456,10 @@ defmodule Domain.FlowsTest do assert DateTime.compare(flow2.inserted_at, flow1.inserted_at) == :gt - assert [{{flow2.client_id, flow2.resource_id}, flow2.inserted_at}] == - Flows.all_gateway_flows_for_cache!(gateway) - end + flows = all_gateway_flows_for_cache!(gateway) - test "returns flow when only one unique one exists", %{ - account: account, - client: client, - gateway: gateway, - membership: membership, - resource: resource, - policy: policy, - subject: subject - } do - flow = - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client, - actor_group_membership: membership, - policy: policy, - resource: resource, - gateway: gateway - ) - - assert [{{flow.client_id, flow.resource_id}, flow.inserted_at}] == - Flows.all_gateway_flows_for_cache!(gateway) + assert {{flow1.client_id, flow1.resource_id}, {flow1.id, flow1.inserted_at}} in flows + assert {{flow2.client_id, flow2.resource_id}, {flow2.id, flow2.inserted_at}} in flows end end