From 7c674ea21cc14b7c04440755542b0a664e382217 Mon Sep 17 00:00:00 2001 From: Jamil Date: Wed, 28 May 2025 23:35:03 -0700 Subject: [PATCH] refactor(portal): Move expire_flow to WAL broadcaster (#9286) Similar to #9285, we move the `expire_flow` event to be broadcasted from the WAL broadcaster. Unrelated tests needed to be updated to not expect to receive the broadcast, and instead check to ensure the record has been updated. A minor bug is also fixed in the ordering of the `old_data, data` fields. Tested manually on dev. Related: #6294 Related: #8187 --- elixir/apps/api/lib/api/gateway/channel.ex | 10 +-- .../api/test/api/gateway/channel_test.exs | 6 ++ elixir/apps/domain/lib/domain/events/event.ex | 4 +- .../domain/lib/domain/events/hooks/flows.ex | 64 +++++++++++++++- elixir/apps/domain/lib/domain/flows.ex | 28 +------ .../apps/domain/test/domain/actors_test.exs | 11 ++- .../jobs/sync_directory_test.exs | 11 +-- .../jumpcloud/jobs/sync_directory_test.exs | 11 +-- .../jobs/sync_directory_test.exs | 11 +-- .../okta/jobs/sync_directory_test.exs | 11 +-- elixir/apps/domain/test/domain/auth_test.exs | 5 +- .../apps/domain/test/domain/clients_test.exs | 9 +-- .../test/domain/events/hooks/flows_test.exs | 75 ++++++++++++++++++- elixir/apps/domain/test/domain/flows_test.exs | 10 +-- .../apps/domain/test/domain/policies_test.exs | 17 ++--- .../domain/test/domain/resources_test.exs | 16 ++-- 16 files changed, 187 insertions(+), 112 deletions(-) diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index dcb0939a2..8f92b3a8e 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Channel do use API, :channel alias API.Gateway.Views - alias Domain.{Clients, Resources, Relays, Gateways, Flows} + alias Domain.{Clients, Events, Resources, Relays, Gateways, Flows} alias Domain.Relays.Presence.Debouncer require Logger require OpenTelemetry.Tracer @@ -134,7 +134,7 @@ defmodule API.Gateway.Channel do client_id: client_id, resource_id: resource_id } do - :ok = Flows.unsubscribe_to_flow_expiration_events(flow_id) + :ok = Events.Hooks.Flows.unsubscribe(flow_id) push(socket, "reject_access", %{ flow_id: flow_id, @@ -307,7 +307,7 @@ defmodule API.Gateway.Channel do } = payload OpenTelemetry.Tracer.with_span "gateway.authorize_flow" do - :ok = Flows.subscribe_to_flow_expiration_events(flow_id) + :ok = Events.Hooks.Flows.subscribe(flow_id) Logger.debug("Gateway authorizes a new network flow", flow_id: flow_id, @@ -378,7 +378,7 @@ defmodule API.Gateway.Channel do client_id: client_id, resource_id: resource_id } do - :ok = Flows.subscribe_to_flow_expiration_events(flow_id) + :ok = Events.Hooks.Flows.subscribe(flow_id) client = Clients.fetch_client_by_id!(client_id) resource = Resources.fetch_resource_by_id!(resource_id) @@ -443,7 +443,7 @@ defmodule API.Gateway.Channel do } = attrs OpenTelemetry.Tracer.with_span "gateway.request_connection" do - :ok = Flows.subscribe_to_flow_expiration_events(flow_id) + :ok = Events.Hooks.Flows.subscribe(flow_id) Logger.debug("Gateway received connection request message", client_id: client_id, diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index a4535058f..525ee7f35 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -220,6 +220,8 @@ defmodule API.Gateway.ChannelTest do {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) + assert_push "reject_access", %{ flow_id: flow_id, client_id: client_id, @@ -628,6 +630,8 @@ defmodule API.Gateway.ChannelTest do {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) + assert_push "reject_access", %{ flow_id: flow_id, client_id: client_id, @@ -844,6 +848,8 @@ defmodule API.Gateway.ChannelTest do {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) + assert_push "reject_access", %{ flow_id: flow_id, client_id: client_id, diff --git a/elixir/apps/domain/lib/domain/events/event.ex b/elixir/apps/domain/lib/domain/events/event.ex index 2ee3959d0..9fb065e79 100644 --- a/elixir/apps/domain/lib/domain/events/event.ex +++ b/elixir/apps/domain/lib/domain/events/event.ex @@ -9,7 +9,7 @@ defmodule Domain.Events.Event do it to the appropriate hook module for processing. """ def ingest(msg, relations) do - {op, tuple_data, old_tuple_data} = extract_msg_data(msg) + {op, old_tuple_data, tuple_data} = extract_msg_data(msg) {:ok, relation} = Map.fetch(relations, msg.relation_id) table = relation.name @@ -308,7 +308,7 @@ defmodule Domain.Events.Event do {:insert, nil, data} end - defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: data, tuple_data: old}) do + defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do {:update, old, data} end diff --git a/elixir/apps/domain/lib/domain/events/hooks/flows.ex b/elixir/apps/domain/lib/domain/events/hooks/flows.ex index 10317706a..a0367cb3c 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/flows.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/flows.ex @@ -1,13 +1,69 @@ defmodule Domain.Events.Hooks.Flows do + alias Domain.PubSub + require Logger + def on_insert(_data) do :ok end - def on_update(_old_data, _data) do - :ok + def on_update( + _old_data, + %{ + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id, + "expires_at" => expires_at + } = _data + ) do + if expired?(expires_at) do + # Flow has become expired + broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) + else + :ok + end end - def on_delete(_old_data) do - :ok + # During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases + # where we might manually clear flows in a migration or some other mechanism. + def on_delete( + %{ + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } = _old_data + ) do + broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) + end + + def subscribe(flow_id) do + flow_id + |> topic() + |> PubSub.subscribe() + end + + def unsubscribe(flow_id) do + flow_id + |> topic() + |> PubSub.unsubscribe() + end + + defp expired?(nil), do: false + + defp expired?(expires_at) do + with {:ok, expires_at, _} <- DateTime.from_iso8601(expires_at) do + DateTime.compare(DateTime.utc_now(), expires_at) == :gt + else + _ -> false + end + end + + defp topic(flow_id) do + "flows:#{flow_id}" + end + + defp broadcast(flow_id, payload) do + flow_id + |> topic() + |> PubSub.broadcast(payload) end end diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index d944cb3b7..9255bc05f 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -1,5 +1,5 @@ defmodule Domain.Flows do - alias Domain.{Repo, PubSub} + alias Domain.Repo alias Domain.{Auth, Accounts, Actors, Clients, Gateways, Resources, Policies, Tokens} alias Domain.Flows.{Authorizer, Flow, Activity} require Ecto.Query @@ -242,32 +242,6 @@ defmodule Domain.Flows do |> Flow.Query.expire() |> Repo.update_all([]) - # TODO: WAL - :ok = - Enum.each(flows, fn flow -> - :ok = broadcast_flow_expiration_event(flow) - end) - {:ok, flows} end - - ### PubSub - - defp flow_topic(%Flow{} = flow), do: flow_topic(flow.id) - defp flow_topic(flow_id), do: "flows:#{flow_id}" - - def subscribe_to_flow_expiration_events(flow_or_id) do - flow_or_id |> flow_topic() |> PubSub.subscribe() - end - - def unsubscribe_to_flow_expiration_events(flow_or_id) do - flow_or_id |> flow_topic() |> PubSub.subscribe() - end - - # TODO: WAL - defp broadcast_flow_expiration_event(flow) do - flow - |> flow_topic() - |> PubSub.broadcast({:expire_flow, flow.id, flow.client_id, flow.resource_id}) - end end diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index 565180377..835b14b5f 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -873,7 +873,6 @@ defmodule Domain.ActorsTest do :ok = Domain.Policies.subscribe_to_events_for_actor_group(group3) :ok = Domain.Policies.subscribe_to_events_for_actor_group(group4) :ok = Domain.Policies.subscribe_to_events_for_actor_group(group5) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(group1_flow) attrs_list = [ %{"name" => "Group:Infrastructure", "provider_identifier" => "G:GROUP_ID2"}, @@ -902,10 +901,11 @@ defmodule Domain.ActorsTest do group1_id = group1.id group1_policy_id = group1_policy.id - group1_flow_id = group1_flow.id assert_receive {:delete_membership, ^actor_id, ^group1_id} assert_receive {:reject_access, ^group1_policy_id, ^group1_id, _resource_id} - assert_receive {:expire_flow, ^group1_flow_id, _client_id, _resource_id} + + group1_flow = Repo.reload(group1_flow) + assert DateTime.compare(group1_flow.expires_at, DateTime.utc_now()) == :lt group2_id = group2.id refute_receive {:delete_membership, _actor_id, ^group2_id} @@ -1233,7 +1233,6 @@ defmodule Domain.ActorsTest do :ok = Domain.Policies.subscribe_to_events_for_actor(identity2.actor_id) :ok = Domain.Policies.subscribe_to_events_for_actor_group(group1) :ok = Domain.Policies.subscribe_to_events_for_actor_group(group2) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) assert {:ok, %{ @@ -1264,8 +1263,8 @@ defmodule Domain.ActorsTest do assert_receive {:delete_membership, ^actor2_id, ^group2_id} assert_receive {:reject_access, _policy_id, ^group2_id, _resource_id} - flow_id = flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "deletes memberships of removed groups", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs index 00779b5b0..a92a54673 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs @@ -635,8 +635,6 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do :ok = Domain.Policies.subscribe_to_events_for_actor(actor) :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") GoogleWorkspaceDirectory.override_endpoint_url("http://localhost:#{bypass.port}/") @@ -735,19 +733,18 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - flow_id = deleted_group_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + deleted_group_flow = Repo.reload(deleted_group_flow) + assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + deleted_identity_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt # Should not do anything else refute_receive {:create_membership, _actor_id, _group_id} refute_received {:remove_membership, _actor_id, _group_id} refute_received {:allow_access, _policy_id, _group_id, _resource_id} refute_received {:reject_access, _policy_id, _group_id, _resource_id} - refute_received {:expire_flow, _flow_id, _client_id, _resource_id} end test "resurrects deleted identities that reappear on the next sync", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs index 356bbd3c8..f9b9fa9d4 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs @@ -422,8 +422,6 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do :ok = Domain.Policies.subscribe_to_events_for_actor(actor) :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") WorkOSDirectory.override_base_url("http://localhost:#{bypass.port}") @@ -494,19 +492,18 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - flow_id = deleted_group_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + deleted_group_flow = Repo.reload(deleted_group_flow) + assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + deleted_identity_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt # Should not do anything else refute_receive {:create_membership, _actor_id, _group_id} refute_received {:remove_membership, _actor_id, _group_id} refute_received {:allow_access, _policy_id, _group_id, _resource_id} refute_received {:reject_access, _policy_id, _group_id, _resource_id} - refute_received {:expire_flow, _flow_id, _client_id, _resource_id} end test "resurrects deleted identities that reappear on the next sync", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs index a8bee3668..a36d760d8 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs @@ -472,8 +472,6 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do :ok = Domain.Policies.subscribe_to_events_for_actor(actor) :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") MicrosoftEntraDirectory.mock_groups_list_endpoint( @@ -567,19 +565,18 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - flow_id = deleted_group_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + deleted_group_flow = Repo.reload(deleted_group_flow) + assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + deleted_identity_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt # Should not do anything else refute_receive {:create_membership, _actor_id, _group_id} refute_received {:remove_membership, _actor_id, _group_id} refute_received {:allow_access, _policy_id, _group_id, _resource_id} refute_received {:reject_access, _policy_id, _group_id, _resource_id} - refute_received {:expire_flow, _flow_id, _client_id, _resource_id} end test "stops the sync retries on 401 error on the provider", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs index ed6a73d61..5aa2c9be4 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs @@ -714,8 +714,6 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do :ok = Domain.Policies.subscribe_to_events_for_actor(actor) :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") OktaDirectory.mock_groups_list_endpoint(bypass, 200, Jason.encode!(groups)) @@ -800,19 +798,18 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - flow_id = deleted_group_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + deleted_group_flow = Repo.reload(deleted_group_flow) + assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + deleted_identity_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt # Should not do anything else refute_receive {:create_membership, _actor_id, _group_id} refute_received {:remove_membership, _actor_id, _group_id} refute_received {:allow_access, _policy_id, _group_id, _resource_id} refute_received {:reject_access, _policy_id, _group_id, _resource_id} - refute_received {:expire_flow, _flow_id, _client_id, _resource_id} end test "resurrects deleted identities that reappear on the next sync", %{ diff --git a/elixir/apps/domain/test/domain/auth_test.exs b/elixir/apps/domain/test/domain/auth_test.exs index 1dfa103ad..4df2b00e1 100644 --- a/elixir/apps/domain/test/domain/auth_test.exs +++ b/elixir/apps/domain/test/domain/auth_test.exs @@ -1881,7 +1881,6 @@ defmodule Domain.AuthTest do end :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) attrs_list = [ %{ @@ -1939,8 +1938,8 @@ defmodule Domain.AuthTest do assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: "disconnect", payload: nil} # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + reloaded_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(reloaded_flow.expires_at, DateTime.utc_now()) == :lt end test "circuit breaker prevents mass deletions of identities", %{ diff --git a/elixir/apps/domain/test/domain/clients_test.exs b/elixir/apps/domain/test/domain/clients_test.exs index 839f21c4b..47ec95c78 100644 --- a/elixir/apps/domain/test/domain/clients_test.exs +++ b/elixir/apps/domain/test/domain/clients_test.exs @@ -1081,14 +1081,11 @@ defmodule Domain.ClientsTest do subject: subject ) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) - assert {:ok, client} = verify_client(client, subject) - assert {:ok, client} = remove_client_verification(client, subject) + assert {:ok, _client} = remove_client_verification(client, subject) - assert_received {:expire_flow, flow_id, flow_client_id, _flow_resource_id} - assert flow_id == flow.id - assert flow_client_id == client.id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "returns error when subject has no permission to verify clients", %{ diff --git a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs index 87b7679d5..b38307b49 100644 --- a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs @@ -1,5 +1,5 @@ defmodule Domain.Events.Hooks.FlowsTest do - use ExUnit.Case, async: true + use Domain.DataCase, async: true import Domain.Events.Hooks.Flows setup do @@ -13,14 +13,81 @@ defmodule Domain.Events.Hooks.FlowsTest do end describe "update/2" do - test "returns :ok", %{old_data: old_data, data: data} do + test "broadcasts expire_flow if flow is expired" do + flow_id = "flow_123" + client_id = "client_123" + resource_id = "resource_123" + + old_data = %{} + + data = %{ + "expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(), + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } + + :ok = subscribe(flow_id) + assert :ok == on_update(old_data, data) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} + end + + test "does not broadcast expire_flow if flow is not expired" do + flow_id = "flow_123" + client_id = "client_123" + resource_id = "resource_123" + + old_data = %{} + + data = %{ + "expires_at" => DateTime.utc_now() |> DateTime.add(1, :second) |> DateTime.to_iso8601(), + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } + + :ok = subscribe(flow_id) + + assert :ok == on_update(old_data, data) + refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} + end + + test "does not receive broadcast when not subscribed" do + flow_id = "flow_123" + client_id = "client_123" + resource_id = "resource_123" + + old_data = %{} + + data = %{ + "expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(), + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } + + assert :ok == on_update(old_data, data) + refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end describe "delete/1" do - test "returns :ok", %{data: data} do - assert :ok == on_delete(data) + test "broadcasts expire_flow" do + flow_id = "flow_123" + client_id = "client_123" + resource_id = "resource_123" + + old_data = %{ + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } + + :ok = subscribe(flow_id) + + assert :ok == on_delete(old_data) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end end diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index 4c05cdca6..d24f8c3bf 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -1047,19 +1047,15 @@ defmodule Domain.FlowsTest do assert expired_flow.id == flow.id end - test "broadcast flow expiration events", %{ + test "updates flow expiration expires_at", %{ flow: flow, actor: actor, subject: subject } do - :ok = subscribe_to_flow_expiration_events(flow) - assert {:ok, [_expired_flow]} = expire_flows_for(actor, subject) - assert_received {:expire_flow, flow_id, flow_client_id, flow_resource_id} - assert flow_id == flow.id - assert flow_client_id == flow.client_id - assert flow_resource_id == flow.resource_id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "returns error when subject has no permission to expire flows", %{ diff --git a/elixir/apps/domain/test/domain/policies_test.exs b/elixir/apps/domain/test/domain/policies_test.exs index d10f4e602..930736a2e 100644 --- a/elixir/apps/domain/test/domain/policies_test.exs +++ b/elixir/apps/domain/test/domain/policies_test.exs @@ -596,7 +596,6 @@ defmodule Domain.PoliciesTest do :ok = subscribe_to_events_for_policy(policy) :ok = subscribe_to_events_for_actor_group(policy.actor_group_id) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) attrs = %{resource_id: new_resource.id, actor_group_id: new_actor_group.id} @@ -614,8 +613,8 @@ defmodule Domain.PoliciesTest do assert actor_group_id == policy.actor_group_id assert resource_id == policy.resource_id - assert_receive {:expire_flow, flow_id, _flow_client_id, _flow_resource_id} - assert flow_id == flow.id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "returns error when subject has no permission to update policies", %{ @@ -694,15 +693,13 @@ defmodule Domain.PoliciesTest do policy: policy ) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) - assert {:ok, _policy} = disable_policy(policy, subject) expires_at = Repo.one(Domain.Flows.Flow).expires_at assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 - assert_received {:expire_flow, flow_id, _flow_client_id, _flow_resource_id} - assert flow_id == flow.id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "broadcasts an account message when policy is disabled", %{ @@ -900,15 +897,13 @@ defmodule Domain.PoliciesTest do policy: policy ) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) - assert {:ok, _policy} = delete_policy(policy, subject) expires_at = Repo.one(Domain.Flows.Flow).expires_at assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 - assert_received {:expire_flow, flow_id, _flow_client_id, _flow_resource_id} - assert flow_id == flow.id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "broadcasts an account message when policy is deleted", %{ diff --git a/elixir/apps/domain/test/domain/resources_test.exs b/elixir/apps/domain/test/domain/resources_test.exs index 1afc3ca4f..ee5d0e104 100644 --- a/elixir/apps/domain/test/domain/resources_test.exs +++ b/elixir/apps/domain/test/domain/resources_test.exs @@ -1416,12 +1416,12 @@ defmodule Domain.ResourcesTest do subject: subject } do flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) attrs = %{"name" => "foo"} assert {:updated, _resource} = update_resource(resource, attrs, subject) - refute_receive {:expire_flow, _flow_id, _client_id, _resource_id} + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt end test "allows to update connections", %{account: account, resource: resource, subject: subject} do @@ -1436,7 +1436,6 @@ defmodule Domain.ResourcesTest do gateway2 = Fixtures.Gateways.create_gateway(account: account) flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) attrs = %{ "connections" => [ @@ -1454,9 +1453,8 @@ defmodule Domain.ResourcesTest do gateway_group_ids = Enum.map(resource.connections, & &1.gateway_group_id) assert gateway_group_ids == [gateway2.group_id] - flow_id = flow.id - resource_id = resource.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "does not allow to remove all connections", %{resource: resource, subject: subject} do @@ -1484,7 +1482,6 @@ defmodule Domain.ResourcesTest do subject: subject } do flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) :ok = subscribe_to_events_for_account(account) attrs = %{"address" => "foo"} @@ -1494,12 +1491,13 @@ defmodule Domain.ResourcesTest do # Resource id doesn't change when updated, but for clarity we still test that we receive the # destructive events for the old resource id and the creation events for the new resource id. - flow_id = flow.id updated_resource_id = updated_resource.id resource_id = resource.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^updated_resource_id} + + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "updates the resource when type is changed", %{resource: resource, subject: subject} do