diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index dcb0939a2..8f92b3a8e 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Channel do use API, :channel alias API.Gateway.Views - alias Domain.{Clients, Resources, Relays, Gateways, Flows} + alias Domain.{Clients, Events, Resources, Relays, Gateways, Flows} alias Domain.Relays.Presence.Debouncer require Logger require OpenTelemetry.Tracer @@ -134,7 +134,7 @@ defmodule API.Gateway.Channel do client_id: client_id, resource_id: resource_id } do - :ok = Flows.unsubscribe_to_flow_expiration_events(flow_id) + :ok = Events.Hooks.Flows.unsubscribe(flow_id) push(socket, "reject_access", %{ flow_id: flow_id, @@ -307,7 +307,7 @@ defmodule API.Gateway.Channel do } = payload OpenTelemetry.Tracer.with_span "gateway.authorize_flow" do - :ok = Flows.subscribe_to_flow_expiration_events(flow_id) + :ok = Events.Hooks.Flows.subscribe(flow_id) Logger.debug("Gateway authorizes a new network flow", flow_id: flow_id, @@ -378,7 +378,7 @@ defmodule API.Gateway.Channel do client_id: client_id, resource_id: resource_id } do - :ok = Flows.subscribe_to_flow_expiration_events(flow_id) + :ok = Events.Hooks.Flows.subscribe(flow_id) client = Clients.fetch_client_by_id!(client_id) resource = Resources.fetch_resource_by_id!(resource_id) @@ -443,7 +443,7 @@ defmodule API.Gateway.Channel do } = attrs OpenTelemetry.Tracer.with_span "gateway.request_connection" do - :ok = Flows.subscribe_to_flow_expiration_events(flow_id) + :ok = Events.Hooks.Flows.subscribe(flow_id) Logger.debug("Gateway received connection request message", client_id: client_id, diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index a4535058f..525ee7f35 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -220,6 +220,8 @@ defmodule API.Gateway.ChannelTest do {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) + assert_push "reject_access", %{ flow_id: flow_id, client_id: client_id, @@ -628,6 +630,8 @@ defmodule API.Gateway.ChannelTest do {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) + assert_push "reject_access", %{ flow_id: flow_id, client_id: client_id, @@ -844,6 +848,8 @@ defmodule API.Gateway.ChannelTest do {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) + assert_push "reject_access", %{ flow_id: flow_id, client_id: client_id, diff --git a/elixir/apps/domain/lib/domain/events/event.ex b/elixir/apps/domain/lib/domain/events/event.ex index 2ee3959d0..9fb065e79 100644 --- a/elixir/apps/domain/lib/domain/events/event.ex +++ b/elixir/apps/domain/lib/domain/events/event.ex @@ -9,7 +9,7 @@ defmodule Domain.Events.Event do it to the appropriate hook module for processing. """ def ingest(msg, relations) do - {op, tuple_data, old_tuple_data} = extract_msg_data(msg) + {op, old_tuple_data, tuple_data} = extract_msg_data(msg) {:ok, relation} = Map.fetch(relations, msg.relation_id) table = relation.name @@ -308,7 +308,7 @@ defmodule Domain.Events.Event do {:insert, nil, data} end - defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: data, tuple_data: old}) do + defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do {:update, old, data} end diff --git a/elixir/apps/domain/lib/domain/events/hooks/flows.ex b/elixir/apps/domain/lib/domain/events/hooks/flows.ex index 10317706a..a0367cb3c 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/flows.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/flows.ex @@ -1,13 +1,69 @@ defmodule Domain.Events.Hooks.Flows do + alias Domain.PubSub + require Logger + def on_insert(_data) do :ok end - def on_update(_old_data, _data) do - :ok + def on_update( + _old_data, + %{ + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id, + "expires_at" => expires_at + } = _data + ) do + if expired?(expires_at) do + # Flow has become expired + broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) + else + :ok + end end - def on_delete(_old_data) do - :ok + # During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases + # where we might manually clear flows in a migration or some other mechanism. + def on_delete( + %{ + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } = _old_data + ) do + broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) + end + + def subscribe(flow_id) do + flow_id + |> topic() + |> PubSub.subscribe() + end + + def unsubscribe(flow_id) do + flow_id + |> topic() + |> PubSub.unsubscribe() + end + + defp expired?(nil), do: false + + defp expired?(expires_at) do + with {:ok, expires_at, _} <- DateTime.from_iso8601(expires_at) do + DateTime.compare(DateTime.utc_now(), expires_at) == :gt + else + _ -> false + end + end + + defp topic(flow_id) do + "flows:#{flow_id}" + end + + defp broadcast(flow_id, payload) do + flow_id + |> topic() + |> PubSub.broadcast(payload) end end diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index d944cb3b7..9255bc05f 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -1,5 +1,5 @@ defmodule Domain.Flows do - alias Domain.{Repo, PubSub} + alias Domain.Repo alias Domain.{Auth, Accounts, Actors, Clients, Gateways, Resources, Policies, Tokens} alias Domain.Flows.{Authorizer, Flow, Activity} require Ecto.Query @@ -242,32 +242,6 @@ defmodule Domain.Flows do |> Flow.Query.expire() |> Repo.update_all([]) - # TODO: WAL - :ok = - Enum.each(flows, fn flow -> - :ok = broadcast_flow_expiration_event(flow) - end) - {:ok, flows} end - - ### PubSub - - defp flow_topic(%Flow{} = flow), do: flow_topic(flow.id) - defp flow_topic(flow_id), do: "flows:#{flow_id}" - - def subscribe_to_flow_expiration_events(flow_or_id) do - flow_or_id |> flow_topic() |> PubSub.subscribe() - end - - def unsubscribe_to_flow_expiration_events(flow_or_id) do - flow_or_id |> flow_topic() |> PubSub.subscribe() - end - - # TODO: WAL - defp broadcast_flow_expiration_event(flow) do - flow - |> flow_topic() - |> PubSub.broadcast({:expire_flow, flow.id, flow.client_id, flow.resource_id}) - end end diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index 565180377..835b14b5f 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -873,7 +873,6 @@ defmodule Domain.ActorsTest do :ok = Domain.Policies.subscribe_to_events_for_actor_group(group3) :ok = Domain.Policies.subscribe_to_events_for_actor_group(group4) :ok = Domain.Policies.subscribe_to_events_for_actor_group(group5) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(group1_flow) attrs_list = [ %{"name" => "Group:Infrastructure", "provider_identifier" => "G:GROUP_ID2"}, @@ -902,10 +901,11 @@ defmodule Domain.ActorsTest do group1_id = group1.id group1_policy_id = group1_policy.id - group1_flow_id = group1_flow.id assert_receive {:delete_membership, ^actor_id, ^group1_id} assert_receive {:reject_access, ^group1_policy_id, ^group1_id, _resource_id} - assert_receive {:expire_flow, ^group1_flow_id, _client_id, _resource_id} + + group1_flow = Repo.reload(group1_flow) + assert DateTime.compare(group1_flow.expires_at, DateTime.utc_now()) == :lt group2_id = group2.id refute_receive {:delete_membership, _actor_id, ^group2_id} @@ -1233,7 +1233,6 @@ defmodule Domain.ActorsTest do :ok = Domain.Policies.subscribe_to_events_for_actor(identity2.actor_id) :ok = Domain.Policies.subscribe_to_events_for_actor_group(group1) :ok = Domain.Policies.subscribe_to_events_for_actor_group(group2) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) assert {:ok, %{ @@ -1264,8 +1263,8 @@ defmodule Domain.ActorsTest do assert_receive {:delete_membership, ^actor2_id, ^group2_id} assert_receive {:reject_access, _policy_id, ^group2_id, _resource_id} - flow_id = flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "deletes memberships of removed groups", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs index 00779b5b0..a92a54673 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs @@ -635,8 +635,6 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do :ok = Domain.Policies.subscribe_to_events_for_actor(actor) :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") GoogleWorkspaceDirectory.override_endpoint_url("http://localhost:#{bypass.port}/") @@ -735,19 +733,18 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - flow_id = deleted_group_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + deleted_group_flow = Repo.reload(deleted_group_flow) + assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + deleted_identity_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt # Should not do anything else refute_receive {:create_membership, _actor_id, _group_id} refute_received {:remove_membership, _actor_id, _group_id} refute_received {:allow_access, _policy_id, _group_id, _resource_id} refute_received {:reject_access, _policy_id, _group_id, _resource_id} - refute_received {:expire_flow, _flow_id, _client_id, _resource_id} end test "resurrects deleted identities that reappear on the next sync", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs index 356bbd3c8..f9b9fa9d4 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs @@ -422,8 +422,6 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do :ok = Domain.Policies.subscribe_to_events_for_actor(actor) :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") WorkOSDirectory.override_base_url("http://localhost:#{bypass.port}") @@ -494,19 +492,18 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - flow_id = deleted_group_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + deleted_group_flow = Repo.reload(deleted_group_flow) + assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + deleted_identity_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt # Should not do anything else refute_receive {:create_membership, _actor_id, _group_id} refute_received {:remove_membership, _actor_id, _group_id} refute_received {:allow_access, _policy_id, _group_id, _resource_id} refute_received {:reject_access, _policy_id, _group_id, _resource_id} - refute_received {:expire_flow, _flow_id, _client_id, _resource_id} end test "resurrects deleted identities that reappear on the next sync", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs index a8bee3668..a36d760d8 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs @@ -472,8 +472,6 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do :ok = Domain.Policies.subscribe_to_events_for_actor(actor) :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") MicrosoftEntraDirectory.mock_groups_list_endpoint( @@ -567,19 +565,18 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - flow_id = deleted_group_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + deleted_group_flow = Repo.reload(deleted_group_flow) + assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + deleted_identity_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt # Should not do anything else refute_receive {:create_membership, _actor_id, _group_id} refute_received {:remove_membership, _actor_id, _group_id} refute_received {:allow_access, _policy_id, _group_id, _resource_id} refute_received {:reject_access, _policy_id, _group_id, _resource_id} - refute_received {:expire_flow, _flow_id, _client_id, _resource_id} end test "stops the sync retries on 401 error on the provider", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs index ed6a73d61..5aa2c9be4 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs @@ -714,8 +714,6 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do :ok = Domain.Policies.subscribe_to_events_for_actor(actor) :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") OktaDirectory.mock_groups_list_endpoint(bypass, 200, Jason.encode!(groups)) @@ -800,19 +798,18 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - flow_id = deleted_group_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + deleted_group_flow = Repo.reload(deleted_group_flow) + assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + deleted_identity_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt # Should not do anything else refute_receive {:create_membership, _actor_id, _group_id} refute_received {:remove_membership, _actor_id, _group_id} refute_received {:allow_access, _policy_id, _group_id, _resource_id} refute_received {:reject_access, _policy_id, _group_id, _resource_id} - refute_received {:expire_flow, _flow_id, _client_id, _resource_id} end test "resurrects deleted identities that reappear on the next sync", %{ diff --git a/elixir/apps/domain/test/domain/auth_test.exs b/elixir/apps/domain/test/domain/auth_test.exs index 1dfa103ad..4df2b00e1 100644 --- a/elixir/apps/domain/test/domain/auth_test.exs +++ b/elixir/apps/domain/test/domain/auth_test.exs @@ -1881,7 +1881,6 @@ defmodule Domain.AuthTest do end :ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}") - :ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow) attrs_list = [ %{ @@ -1939,8 +1938,8 @@ defmodule Domain.AuthTest do assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: "disconnect", payload: nil} # Expires flows for signed out user - flow_id = deleted_identity_flow.id - assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id} + reloaded_flow = Repo.reload(deleted_identity_flow) + assert DateTime.compare(reloaded_flow.expires_at, DateTime.utc_now()) == :lt end test "circuit breaker prevents mass deletions of identities", %{ diff --git a/elixir/apps/domain/test/domain/clients_test.exs b/elixir/apps/domain/test/domain/clients_test.exs index 839f21c4b..47ec95c78 100644 --- a/elixir/apps/domain/test/domain/clients_test.exs +++ b/elixir/apps/domain/test/domain/clients_test.exs @@ -1081,14 +1081,11 @@ defmodule Domain.ClientsTest do subject: subject ) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) - assert {:ok, client} = verify_client(client, subject) - assert {:ok, client} = remove_client_verification(client, subject) + assert {:ok, _client} = remove_client_verification(client, subject) - assert_received {:expire_flow, flow_id, flow_client_id, _flow_resource_id} - assert flow_id == flow.id - assert flow_client_id == client.id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "returns error when subject has no permission to verify clients", %{ diff --git a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs index 87b7679d5..b38307b49 100644 --- a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs @@ -1,5 +1,5 @@ defmodule Domain.Events.Hooks.FlowsTest do - use ExUnit.Case, async: true + use Domain.DataCase, async: true import Domain.Events.Hooks.Flows setup do @@ -13,14 +13,81 @@ defmodule Domain.Events.Hooks.FlowsTest do end describe "update/2" do - test "returns :ok", %{old_data: old_data, data: data} do + test "broadcasts expire_flow if flow is expired" do + flow_id = "flow_123" + client_id = "client_123" + resource_id = "resource_123" + + old_data = %{} + + data = %{ + "expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(), + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } + + :ok = subscribe(flow_id) + assert :ok == on_update(old_data, data) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} + end + + test "does not broadcast expire_flow if flow is not expired" do + flow_id = "flow_123" + client_id = "client_123" + resource_id = "resource_123" + + old_data = %{} + + data = %{ + "expires_at" => DateTime.utc_now() |> DateTime.add(1, :second) |> DateTime.to_iso8601(), + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } + + :ok = subscribe(flow_id) + + assert :ok == on_update(old_data, data) + refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} + end + + test "does not receive broadcast when not subscribed" do + flow_id = "flow_123" + client_id = "client_123" + resource_id = "resource_123" + + old_data = %{} + + data = %{ + "expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(), + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } + + assert :ok == on_update(old_data, data) + refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end describe "delete/1" do - test "returns :ok", %{data: data} do - assert :ok == on_delete(data) + test "broadcasts expire_flow" do + flow_id = "flow_123" + client_id = "client_123" + resource_id = "resource_123" + + old_data = %{ + "id" => flow_id, + "client_id" => client_id, + "resource_id" => resource_id + } + + :ok = subscribe(flow_id) + + assert :ok == on_delete(old_data) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end end diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index 4c05cdca6..d24f8c3bf 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -1047,19 +1047,15 @@ defmodule Domain.FlowsTest do assert expired_flow.id == flow.id end - test "broadcast flow expiration events", %{ + test "updates flow expiration expires_at", %{ flow: flow, actor: actor, subject: subject } do - :ok = subscribe_to_flow_expiration_events(flow) - assert {:ok, [_expired_flow]} = expire_flows_for(actor, subject) - assert_received {:expire_flow, flow_id, flow_client_id, flow_resource_id} - assert flow_id == flow.id - assert flow_client_id == flow.client_id - assert flow_resource_id == flow.resource_id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "returns error when subject has no permission to expire flows", %{ diff --git a/elixir/apps/domain/test/domain/policies_test.exs b/elixir/apps/domain/test/domain/policies_test.exs index d10f4e602..930736a2e 100644 --- a/elixir/apps/domain/test/domain/policies_test.exs +++ b/elixir/apps/domain/test/domain/policies_test.exs @@ -596,7 +596,6 @@ defmodule Domain.PoliciesTest do :ok = subscribe_to_events_for_policy(policy) :ok = subscribe_to_events_for_actor_group(policy.actor_group_id) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) attrs = %{resource_id: new_resource.id, actor_group_id: new_actor_group.id} @@ -614,8 +613,8 @@ defmodule Domain.PoliciesTest do assert actor_group_id == policy.actor_group_id assert resource_id == policy.resource_id - assert_receive {:expire_flow, flow_id, _flow_client_id, _flow_resource_id} - assert flow_id == flow.id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "returns error when subject has no permission to update policies", %{ @@ -694,15 +693,13 @@ defmodule Domain.PoliciesTest do policy: policy ) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) - assert {:ok, _policy} = disable_policy(policy, subject) expires_at = Repo.one(Domain.Flows.Flow).expires_at assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 - assert_received {:expire_flow, flow_id, _flow_client_id, _flow_resource_id} - assert flow_id == flow.id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "broadcasts an account message when policy is disabled", %{ @@ -900,15 +897,13 @@ defmodule Domain.PoliciesTest do policy: policy ) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) - assert {:ok, _policy} = delete_policy(policy, subject) expires_at = Repo.one(Domain.Flows.Flow).expires_at assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 - assert_received {:expire_flow, flow_id, _flow_client_id, _flow_resource_id} - assert flow_id == flow.id + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "broadcasts an account message when policy is deleted", %{ diff --git a/elixir/apps/domain/test/domain/resources_test.exs b/elixir/apps/domain/test/domain/resources_test.exs index 1afc3ca4f..ee5d0e104 100644 --- a/elixir/apps/domain/test/domain/resources_test.exs +++ b/elixir/apps/domain/test/domain/resources_test.exs @@ -1416,12 +1416,12 @@ defmodule Domain.ResourcesTest do subject: subject } do flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) attrs = %{"name" => "foo"} assert {:updated, _resource} = update_resource(resource, attrs, subject) - refute_receive {:expire_flow, _flow_id, _client_id, _resource_id} + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt end test "allows to update connections", %{account: account, resource: resource, subject: subject} do @@ -1436,7 +1436,6 @@ defmodule Domain.ResourcesTest do gateway2 = Fixtures.Gateways.create_gateway(account: account) flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) attrs = %{ "connections" => [ @@ -1454,9 +1453,8 @@ defmodule Domain.ResourcesTest do gateway_group_ids = Enum.map(resource.connections, & &1.gateway_group_id) assert gateway_group_ids == [gateway2.group_id] - flow_id = flow.id - resource_id = resource.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "does not allow to remove all connections", %{resource: resource, subject: subject} do @@ -1484,7 +1482,6 @@ defmodule Domain.ResourcesTest do subject: subject } do flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) - :ok = Domain.Flows.subscribe_to_flow_expiration_events(flow) :ok = subscribe_to_events_for_account(account) attrs = %{"address" => "foo"} @@ -1494,12 +1491,13 @@ defmodule Domain.ResourcesTest do # Resource id doesn't change when updated, but for clarity we still test that we receive the # destructive events for the old resource id and the creation events for the new resource id. - flow_id = flow.id updated_resource_id = updated_resource.id resource_id = resource.id - assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^updated_resource_id} + + flow = Repo.reload(flow) + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end test "updates the resource when type is changed", %{resource: resource, subject: subject} do