refactor(portal): Move expire_flow to WAL broadcaster (#9286)

Similar to #9285, we move the `expire_flow` event to be broadcasted from
the WAL broadcaster.

Unrelated tests needed to be updated to not expect to receive the
broadcast, and instead check to ensure the record has been updated.

A minor bug is also fixed in the ordering of the `old_data, data`
fields.

Tested manually on dev.

Related: #6294 
Related: #8187
This commit is contained in:
Jamil
2025-05-28 23:35:03 -07:00
committed by GitHub
parent c1cab32d7f
commit 7c674ea21c
16 changed files with 187 additions and 112 deletions

View File

@@ -1,7 +1,7 @@
defmodule API.Gateway.Channel do
use API, :channel
alias API.Gateway.Views
alias Domain.{Clients, Resources, Relays, Gateways, Flows}
alias Domain.{Clients, Events, Resources, Relays, Gateways, Flows}
alias Domain.Relays.Presence.Debouncer
require Logger
require OpenTelemetry.Tracer
@@ -134,7 +134,7 @@ defmodule API.Gateway.Channel do
client_id: client_id,
resource_id: resource_id
} do
:ok = Flows.unsubscribe_to_flow_expiration_events(flow_id)
:ok = Events.Hooks.Flows.unsubscribe(flow_id)
push(socket, "reject_access", %{
flow_id: flow_id,
@@ -307,7 +307,7 @@ defmodule API.Gateway.Channel do
} = payload
OpenTelemetry.Tracer.with_span "gateway.authorize_flow" do
:ok = Flows.subscribe_to_flow_expiration_events(flow_id)
:ok = Events.Hooks.Flows.subscribe(flow_id)
Logger.debug("Gateway authorizes a new network flow",
flow_id: flow_id,
@@ -378,7 +378,7 @@ defmodule API.Gateway.Channel do
client_id: client_id,
resource_id: resource_id
} do
:ok = Flows.subscribe_to_flow_expiration_events(flow_id)
:ok = Events.Hooks.Flows.subscribe(flow_id)
client = Clients.fetch_client_by_id!(client_id)
resource = Resources.fetch_resource_by_id!(resource_id)
@@ -443,7 +443,7 @@ defmodule API.Gateway.Channel do
} = attrs
OpenTelemetry.Tracer.with_span "gateway.request_connection" do
:ok = Flows.subscribe_to_flow_expiration_events(flow_id)
:ok = Events.Hooks.Flows.subscribe(flow_id)
Logger.debug("Gateway received connection request message",
client_id: client_id,

View File

@@ -220,6 +220,8 @@ defmodule API.Gateway.ChannelTest do
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
assert_push "reject_access", %{
flow_id: flow_id,
client_id: client_id,
@@ -628,6 +630,8 @@ defmodule API.Gateway.ChannelTest do
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
assert_push "reject_access", %{
flow_id: flow_id,
client_id: client_id,
@@ -844,6 +848,8 @@ defmodule API.Gateway.ChannelTest do
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
assert_push "reject_access", %{
flow_id: flow_id,
client_id: client_id,

View File

@@ -9,7 +9,7 @@ defmodule Domain.Events.Event do
it to the appropriate hook module for processing.
"""
def ingest(msg, relations) do
{op, tuple_data, old_tuple_data} = extract_msg_data(msg)
{op, old_tuple_data, tuple_data} = extract_msg_data(msg)
{:ok, relation} = Map.fetch(relations, msg.relation_id)
table = relation.name
@@ -308,7 +308,7 @@ defmodule Domain.Events.Event do
{:insert, nil, data}
end
defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: data, tuple_data: old}) do
defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do
{:update, old, data}
end

View File

@@ -1,13 +1,69 @@
defmodule Domain.Events.Hooks.Flows do
alias Domain.PubSub
require Logger
def on_insert(_data) do
:ok
end
def on_update(_old_data, _data) do
:ok
def on_update(
_old_data,
%{
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id,
"expires_at" => expires_at
} = _data
) do
if expired?(expires_at) do
# Flow has become expired
broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
else
:ok
end
end
def on_delete(_old_data) do
:ok
# During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases
# where we might manually clear flows in a migration or some other mechanism.
def on_delete(
%{
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
} = _old_data
) do
broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
end
def subscribe(flow_id) do
flow_id
|> topic()
|> PubSub.subscribe()
end
def unsubscribe(flow_id) do
flow_id
|> topic()
|> PubSub.unsubscribe()
end
defp expired?(nil), do: false
defp expired?(expires_at) do
with {:ok, expires_at, _} <- DateTime.from_iso8601(expires_at) do
DateTime.compare(DateTime.utc_now(), expires_at) == :gt
else
_ -> false
end
end
defp topic(flow_id) do
"flows:#{flow_id}"
end
defp broadcast(flow_id, payload) do
flow_id
|> topic()
|> PubSub.broadcast(payload)
end
end

View File

@@ -1,5 +1,5 @@
defmodule Domain.Flows do
alias Domain.{Repo, PubSub}
alias Domain.Repo
alias Domain.{Auth, Accounts, Actors, Clients, Gateways, Resources, Policies, Tokens}
alias Domain.Flows.{Authorizer, Flow, Activity}
require Ecto.Query
@@ -242,32 +242,6 @@ defmodule Domain.Flows do
|> Flow.Query.expire()
|> Repo.update_all([])
# TODO: WAL
:ok =
Enum.each(flows, fn flow ->
:ok = broadcast_flow_expiration_event(flow)
end)
{:ok, flows}
end
### PubSub
defp flow_topic(%Flow{} = flow), do: flow_topic(flow.id)
defp flow_topic(flow_id), do: "flows:#{flow_id}"
def subscribe_to_flow_expiration_events(flow_or_id) do
flow_or_id |> flow_topic() |> PubSub.subscribe()
end
def unsubscribe_to_flow_expiration_events(flow_or_id) do
flow_or_id |> flow_topic() |> PubSub.subscribe()
end
# TODO: WAL
defp broadcast_flow_expiration_event(flow) do
flow
|> flow_topic()
|> PubSub.broadcast({:expire_flow, flow.id, flow.client_id, flow.resource_id})
end
end

View File

@@ -873,7 +873,6 @@ defmodule Domain.ActorsTest do
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group3)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group4)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group5)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(group1_flow)
attrs_list = [
%{"name" => "Group:Infrastructure", "provider_identifier" => "G:GROUP_ID2"},
@@ -902,10 +901,11 @@ defmodule Domain.ActorsTest do
group1_id = group1.id
group1_policy_id = group1_policy.id
group1_flow_id = group1_flow.id
assert_receive {:delete_membership, ^actor_id, ^group1_id}
assert_receive {:reject_access, ^group1_policy_id, ^group1_id, _resource_id}
assert_receive {:expire_flow, ^group1_flow_id, _client_id, _resource_id}
group1_flow = Repo.reload(group1_flow)
assert DateTime.compare(group1_flow.expires_at, DateTime.utc_now()) == :lt
group2_id = group2.id
refute_receive {:delete_membership, _actor_id, ^group2_id}
@@ -1233,7 +1233,6 @@ defmodule Domain.ActorsTest do
:ok = Domain.Policies.subscribe_to_events_for_actor(identity2.actor_id)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group1)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group2)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(flow)
assert {:ok,
%{
@@ -1264,8 +1263,8 @@ defmodule Domain.ActorsTest do
assert_receive {:delete_membership, ^actor2_id, ^group2_id}
assert_receive {:reject_access, _policy_id, ^group2_id, _resource_id}
flow_id = flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id}
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
end
test "deletes memberships of removed groups", %{

View File

@@ -635,8 +635,6 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
:ok = Domain.Policies.subscribe_to_events_for_actor(actor)
:ok = Domain.Policies.subscribe_to_events_for_actor(other_actor)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow)
:ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}")
GoogleWorkspaceDirectory.override_endpoint_url("http://localhost:#{bypass.port}/")
@@ -735,19 +733,18 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
# Deleted policies expire all flows authorized by them
flow_id = deleted_group_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id}
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
# Expires flows for signed out user
flow_id = deleted_identity_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id}
deleted_identity_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
# Should not do anything else
refute_receive {:create_membership, _actor_id, _group_id}
refute_received {:remove_membership, _actor_id, _group_id}
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
refute_received {:reject_access, _policy_id, _group_id, _resource_id}
refute_received {:expire_flow, _flow_id, _client_id, _resource_id}
end
test "resurrects deleted identities that reappear on the next sync", %{

View File

@@ -422,8 +422,6 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
:ok = Domain.Policies.subscribe_to_events_for_actor(actor)
:ok = Domain.Policies.subscribe_to_events_for_actor(other_actor)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow)
:ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}")
WorkOSDirectory.override_base_url("http://localhost:#{bypass.port}")
@@ -494,19 +492,18 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
# Deleted policies expire all flows authorized by them
flow_id = deleted_group_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id}
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
# Expires flows for signed out user
flow_id = deleted_identity_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id}
deleted_identity_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
# Should not do anything else
refute_receive {:create_membership, _actor_id, _group_id}
refute_received {:remove_membership, _actor_id, _group_id}
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
refute_received {:reject_access, _policy_id, _group_id, _resource_id}
refute_received {:expire_flow, _flow_id, _client_id, _resource_id}
end
test "resurrects deleted identities that reappear on the next sync", %{

View File

@@ -472,8 +472,6 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
:ok = Domain.Policies.subscribe_to_events_for_actor(actor)
:ok = Domain.Policies.subscribe_to_events_for_actor(other_actor)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow)
:ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}")
MicrosoftEntraDirectory.mock_groups_list_endpoint(
@@ -567,19 +565,18 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
# Deleted policies expire all flows authorized by them
flow_id = deleted_group_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id}
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
# Expires flows for signed out user
flow_id = deleted_identity_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id}
deleted_identity_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
# Should not do anything else
refute_receive {:create_membership, _actor_id, _group_id}
refute_received {:remove_membership, _actor_id, _group_id}
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
refute_received {:reject_access, _policy_id, _group_id, _resource_id}
refute_received {:expire_flow, _flow_id, _client_id, _resource_id}
end
test "stops the sync retries on 401 error on the provider", %{

View File

@@ -714,8 +714,6 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
:ok = Domain.Policies.subscribe_to_events_for_actor(actor)
:ok = Domain.Policies.subscribe_to_events_for_actor(other_actor)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_group_flow)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow)
:ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}")
OktaDirectory.mock_groups_list_endpoint(bypass, 200, Jason.encode!(groups))
@@ -800,19 +798,18 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
# Deleted policies expire all flows authorized by them
flow_id = deleted_group_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id}
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
# Expires flows for signed out user
flow_id = deleted_identity_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id}
deleted_identity_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
# Should not do anything else
refute_receive {:create_membership, _actor_id, _group_id}
refute_received {:remove_membership, _actor_id, _group_id}
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
refute_received {:reject_access, _policy_id, _group_id, _resource_id}
refute_received {:expire_flow, _flow_id, _client_id, _resource_id}
end
test "resurrects deleted identities that reappear on the next sync", %{

View File

@@ -1881,7 +1881,6 @@ defmodule Domain.AuthTest do
end
:ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}")
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow)
attrs_list = [
%{
@@ -1939,8 +1938,8 @@ defmodule Domain.AuthTest do
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: "disconnect", payload: nil}
# Expires flows for signed out user
flow_id = deleted_identity_flow.id
assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id}
reloaded_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(reloaded_flow.expires_at, DateTime.utc_now()) == :lt
end
test "circuit breaker prevents mass deletions of identities", %{

View File

@@ -1081,14 +1081,11 @@ defmodule Domain.ClientsTest do
subject: subject
)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(flow)
assert {:ok, client} = verify_client(client, subject)
assert {:ok, client} = remove_client_verification(client, subject)
assert {:ok, _client} = remove_client_verification(client, subject)
assert_received {:expire_flow, flow_id, flow_client_id, _flow_resource_id}
assert flow_id == flow.id
assert flow_client_id == client.id
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
end
test "returns error when subject has no permission to verify clients", %{

View File

@@ -1,5 +1,5 @@
defmodule Domain.Events.Hooks.FlowsTest do
use ExUnit.Case, async: true
use Domain.DataCase, async: true
import Domain.Events.Hooks.Flows
setup do
@@ -13,14 +13,81 @@ defmodule Domain.Events.Hooks.FlowsTest do
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
test "broadcasts expire_flow if flow is expired" do
flow_id = "flow_123"
client_id = "client_123"
resource_id = "resource_123"
old_data = %{}
data = %{
"expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(),
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
}
:ok = subscribe(flow_id)
assert :ok == on_update(old_data, data)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "does not broadcast expire_flow if flow is not expired" do
flow_id = "flow_123"
client_id = "client_123"
resource_id = "resource_123"
old_data = %{}
data = %{
"expires_at" => DateTime.utc_now() |> DateTime.add(1, :second) |> DateTime.to_iso8601(),
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
}
:ok = subscribe(flow_id)
assert :ok == on_update(old_data, data)
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "does not receive broadcast when not subscribed" do
flow_id = "flow_123"
client_id = "client_123"
resource_id = "resource_123"
old_data = %{}
data = %{
"expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(),
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
}
assert :ok == on_update(old_data, data)
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == on_delete(data)
test "broadcasts expire_flow" do
flow_id = "flow_123"
client_id = "client_123"
resource_id = "resource_123"
old_data = %{
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
}
:ok = subscribe(flow_id)
assert :ok == on_delete(old_data)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
end
end

View File

@@ -1047,19 +1047,15 @@ defmodule Domain.FlowsTest do
assert expired_flow.id == flow.id
end
test "broadcast flow expiration events", %{
test "updates flow expiration expires_at", %{
flow: flow,
actor: actor,
subject: subject
} do
:ok = subscribe_to_flow_expiration_events(flow)
assert {:ok, [_expired_flow]} = expire_flows_for(actor, subject)
assert_received {:expire_flow, flow_id, flow_client_id, flow_resource_id}
assert flow_id == flow.id
assert flow_client_id == flow.client_id
assert flow_resource_id == flow.resource_id
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
end
test "returns error when subject has no permission to expire flows", %{

View File

@@ -596,7 +596,6 @@ defmodule Domain.PoliciesTest do
:ok = subscribe_to_events_for_policy(policy)
:ok = subscribe_to_events_for_actor_group(policy.actor_group_id)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(flow)
attrs = %{resource_id: new_resource.id, actor_group_id: new_actor_group.id}
@@ -614,8 +613,8 @@ defmodule Domain.PoliciesTest do
assert actor_group_id == policy.actor_group_id
assert resource_id == policy.resource_id
assert_receive {:expire_flow, flow_id, _flow_client_id, _flow_resource_id}
assert flow_id == flow.id
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
end
test "returns error when subject has no permission to update policies", %{
@@ -694,15 +693,13 @@ defmodule Domain.PoliciesTest do
policy: policy
)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(flow)
assert {:ok, _policy} = disable_policy(policy, subject)
expires_at = Repo.one(Domain.Flows.Flow).expires_at
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
assert_received {:expire_flow, flow_id, _flow_client_id, _flow_resource_id}
assert flow_id == flow.id
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
end
test "broadcasts an account message when policy is disabled", %{
@@ -900,15 +897,13 @@ defmodule Domain.PoliciesTest do
policy: policy
)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(flow)
assert {:ok, _policy} = delete_policy(policy, subject)
expires_at = Repo.one(Domain.Flows.Flow).expires_at
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
assert_received {:expire_flow, flow_id, _flow_client_id, _flow_resource_id}
assert flow_id == flow.id
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
end
test "broadcasts an account message when policy is deleted", %{

View File

@@ -1416,12 +1416,12 @@ defmodule Domain.ResourcesTest do
subject: subject
} do
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(flow)
attrs = %{"name" => "foo"}
assert {:updated, _resource} = update_resource(resource, attrs, subject)
refute_receive {:expire_flow, _flow_id, _client_id, _resource_id}
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt
end
test "allows to update connections", %{account: account, resource: resource, subject: subject} do
@@ -1436,7 +1436,6 @@ defmodule Domain.ResourcesTest do
gateway2 = Fixtures.Gateways.create_gateway(account: account)
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(flow)
attrs = %{
"connections" => [
@@ -1454,9 +1453,8 @@ defmodule Domain.ResourcesTest do
gateway_group_ids = Enum.map(resource.connections, & &1.gateway_group_id)
assert gateway_group_ids == [gateway2.group_id]
flow_id = flow.id
resource_id = resource.id
assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id}
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
end
test "does not allow to remove all connections", %{resource: resource, subject: subject} do
@@ -1484,7 +1482,6 @@ defmodule Domain.ResourcesTest do
subject: subject
} do
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(flow)
:ok = subscribe_to_events_for_account(account)
attrs = %{"address" => "foo"}
@@ -1494,12 +1491,13 @@ defmodule Domain.ResourcesTest do
# Resource id doesn't change when updated, but for clarity we still test that we receive the
# destructive events for the old resource id and the creation events for the new resource id.
flow_id = flow.id
updated_resource_id = updated_resource.id
resource_id = resource.id
assert_receive {:expire_flow, ^flow_id, _client_id, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:create_resource, ^updated_resource_id}
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
end
test "updates the resource when type is changed", %{resource: resource, subject: subject} do