refactor(portal): don't rely on flows.expires_at (#9692)

The `expires_at` column on the `flows` table was never used outside of
the context in which the flow was created in the Client Channel. This
ephemeral state, which is created in the `Domain.Flows.authorize_flow/4`
function, is never read from the DB in any meaningful capacity, so it
can be safely removed.

The `expire_flows_for` family of functions now simply reads the needed
fields from the flows table in order to broadcast `{:expire_flow,
flow_id, client_id, resource_id}` directly to the subscribed entities.

This PR is step 1 in removing the reliance on `Flows` to manage
ephemeral access state. In a subsequent PR we will actually change the
structure of what state is kept in the channel PIDs such that reliance
on this Flows table will no longer be necessary.

Additionally, in a few places, we were referencing a Flows.Show view
that was never available in production, so this dead code has been
removed.

Lastly, the `flows` table subscription and associated hook processing
has been completely removed as it is no longer needed. We've implemented
in #9667 logic to remove publications from removed table subscriptions,
so we can expect to get a couple ingest warnings when we deploy this as
the `Hooks.Flows` processor no longer exists, and the WAL data may have
lingering flows records in the queue. These can be safely ignored.
This commit is contained in:
Jamil
2025-06-27 11:29:12 -07:00
committed by GitHub
parent 8cfc7ad865
commit 0b09d9f2f5
45 changed files with 386 additions and 920 deletions

View File

@@ -120,7 +120,7 @@ jobs:
with:
project: firezone-staging
- name: Seed database
run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed'
run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.migrate --migrations-path priv/repo/migrations --migrations-path priv/repo/manual_migrations && mix ecto.seed'
- name: Start docker compose in the background
run: |
set -xe

View File

@@ -306,7 +306,7 @@ jobs:
with:
project: firezone-staging
- name: Seed database
run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed'
run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed --migrations-path priv/repo/migrations --migrations-path priv/repo/manual_migrations'
- name: Start docker compose in the background
run: |
# We need to increase the log level to make sure that they don't hold off storm of packets

View File

@@ -643,7 +643,7 @@ defmodule API.Client.Channel do
OpenTelemetry.Tracer.set_attribute(:gateways_count, length(gateways)),
gateway = Gateways.load_balance_gateways(location, gateways, connected_gateway_ids),
OpenTelemetry.Tracer.set_attribute(:gateway_id, gateway.id),
{:ok, resource, flow} <-
{:ok, resource, flow, expires_at} <-
Flows.authorize_flow(
socket.assigns.client,
gateway,
@@ -665,7 +665,7 @@ defmodule API.Client.Channel do
client_id: socket.assigns.client.id,
resource_id: resource.id,
flow_id: flow.id,
authorization_expires_at: flow.expires_at,
authorization_expires_at: expires_at,
ice_credentials: ice_credentials,
preshared_key: preshared_key
}, {opentelemetry_ctx, opentelemetry_span_ctx}}
@@ -789,7 +789,7 @@ defmodule API.Client.Channel do
OpenTelemetry.Tracer.with_span "client.reuse_connection", attributes: attrs do
with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
{:ok, resource, flow} <-
{:ok, resource, flow, _expires_at} <-
Flows.authorize_flow(
socket.assigns.client,
gateway,
@@ -851,7 +851,7 @@ defmodule API.Client.Channel do
OpenTelemetry.Tracer.with_span "client.request_connection", attributes: ctx_attrs do
with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
{:ok, resource, flow} <-
{:ok, resource, flow, _expires_at} <-
Flows.authorize_flow(
socket.assigns.client,
gateway,

View File

@@ -224,7 +224,7 @@ defmodule API.Gateway.ChannelTest do
assert_push "allow_access", %{}
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
assert :ok = Domain.Flows.expire_flows_for(resource, subject)
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
@@ -647,7 +647,7 @@ defmodule API.Gateway.ChannelTest do
assert_push "request_connection", %{}
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
assert :ok = Domain.Flows.expire_flows_for(resource, subject)
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
@@ -878,7 +878,7 @@ defmodule API.Gateway.ChannelTest do
assert_push "authorize_flow", %{}
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
assert :ok = Domain.Flows.expire_flows_for(resource, subject)
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})

View File

@@ -34,13 +34,6 @@ defmodule Domain.ChangeLogs.ReplicationConnection do
:ok
end
defp log(_op, _lsn, "flows", _old_data, _data) do
# TODO: WAL
# Flows are not logged to the change log as they are used only to trigger side effects which
# will be removed. Remove the flows table publication when that happens.
:ok
end
defp log(op, lsn, table, old_data, data) do
attrs = %{
op: op,

View File

@@ -212,8 +212,10 @@ defmodule Domain.Clients do
preload: [:online?]
)
|> case do
# TODO: WAL
# Broadcast flow side effects directly
{:ok, client} ->
{:ok, _flows} = Flows.expire_flows_for(client)
:ok = Flows.expire_flows_for(client)
{:ok, client}
{:error, reason} ->

View File

@@ -22,7 +22,10 @@ defmodule Domain.Events.Hooks.ActorGroupMemberships do
Task.start(fn ->
:ok = PubSub.Actor.Memberships.broadcast(actor_id, {:delete_membership, actor_id, group_id})
broadcast_access(:reject, actor_id, group_id)
{:ok, _flows} = Flows.expire_flows_for(account_id, actor_id, group_id)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Flows.expire_flows_for(account_id, actor_id, group_id)
end)
:ok

View File

@@ -1,51 +0,0 @@
# TODO: WAL
# Move side-effects from flows to state table in clients and gateways
defmodule Domain.Events.Hooks.Flows do
@behaviour Domain.Events.Hooks
alias Domain.PubSub
require Logger
@impl true
def on_insert(_data), do: :ok
@impl true
def on_update(
_old_data,
%{
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id,
"expires_at" => expires_at
} = _data
) do
if expired?(expires_at) do
# Flow has become expired
PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
else
:ok
end
end
# During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases
# where we might manually clear flows in a migration or some other mechanism.
@impl true
def on_delete(
%{
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
} = _old_data
) do
PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
end
defp expired?(nil), do: false
defp expired?(expires_at) do
with {:ok, expires_at, _} <- DateTime.from_iso8601(expires_at) do
DateTime.compare(DateTime.utc_now(), expires_at) == :gt
else
_ -> false
end
end
end

View File

@@ -69,7 +69,9 @@ defmodule Domain.Events.Hooks.Policies do
payload = {:reject_access, policy_id, actor_group_id, resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
Flows.expire_flows_for_policy_id(account_id, policy_id)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Flows.expire_flows_for_policy_id(account_id, policy_id)
end)
:ok
@@ -124,7 +126,9 @@ defmodule Domain.Events.Hooks.Policies do
payload = {:allow_access, policy_id, actor_group_id, resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
Flows.expire_flows_for_policy_id(account_id, policy_id)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Flows.expire_flows_for_policy_id(account_id, policy_id)
end)
else
Logger.warning("Breaking update ignored for policy as it is deleted or disabled",
@@ -161,7 +165,9 @@ defmodule Domain.Events.Hooks.Policies do
payload = {:reject_access, policy_id, actor_group_id, resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
Flows.expire_flows_for_policy_id(account_id, policy_id)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Flows.expire_flows_for_policy_id(account_id, policy_id)
end)
:ok

View File

@@ -11,11 +11,10 @@ defmodule Domain.Events.Hooks.ResourceConnections do
@impl true
def on_delete(%{"account_id" => account_id, "resource_id" => resource_id} = _old_data) do
# TODO: WAL
# The flow expires_at field is not used for any persistence-related reason.
# Remove it and broadcast directly to subscribed pids to remove the flow
# from their local state. This hook is called when resources change sites.
# Broadcast flow side effects directly
# This hook is called when resources change sites.
Task.start(fn ->
{:ok, _flows} = Flows.expire_flows_for_resource_id(account_id, resource_id)
:ok = Flows.expire_flows_for_resource_id(account_id, resource_id)
end)
:ok

View File

@@ -39,7 +39,7 @@ defmodule Domain.Events.Hooks.Resources do
old_filters != filters or
old_ip_stack != ip_stack do
# TODO: WAL
# Directly broadcast to subscribed pids to remove the flow
# Broadcast flow side effects directly
Task.start(fn ->
payload = {:delete_resource, resource_id}
PubSub.Resource.broadcast(resource_id, payload)
@@ -49,7 +49,7 @@ defmodule Domain.Events.Hooks.Resources do
PubSub.Resource.broadcast(resource_id, payload)
PubSub.Account.Resources.broadcast(account_id, payload)
{:ok, _flows} = Flows.expire_flows_for_resource_id(account_id, resource_id)
:ok = Flows.expire_flows_for_resource_id(account_id, resource_id)
end)
:ok

View File

@@ -19,7 +19,6 @@ defmodule Domain.Events.ReplicationConnection do
"auth_providers" => Hooks.AuthProviders,
"clients" => Hooks.Clients,
"flow_activities" => Hooks.FlowActivities,
"flows" => Hooks.Flows,
"gateway_groups" => Hooks.GatewayGroups,
"gateways" => Hooks.Gateways,
"policies" => Hooks.Policies,

View File

@@ -43,12 +43,13 @@ defmodule Domain.Flows do
account_id: account_id,
client_remote_ip: client_remote_ip,
client_user_agent: client_user_agent,
gateway_remote_ip: gateway_remote_ip,
expires_at: conformation_expires_at || expires_at
gateway_remote_ip: gateway_remote_ip
})
|> Repo.insert!()
{:ok, resource, flow}
expires_at = conformation_expires_at || expires_at
{:ok, resource, flow, expires_at}
end
end
@@ -250,11 +251,32 @@ defmodule Domain.Flows do
end
defp expire_flows(queryable) do
{_count, flows} =
queryable
|> Flow.Query.expire()
|> Repo.update_all([])
{:ok, :ok} =
Repo.transaction(fn ->
queryable
|> Repo.stream()
|> Stream.chunk_every(100)
|> Enum.each(fn chunk ->
Enum.each(chunk, &broadcast_flow_expiration/1)
end)
end)
{:ok, flows}
:ok
end
defp broadcast_flow_expiration(flow) do
case Domain.PubSub.Flow.broadcast(
flow.id,
{:expire_flow, flow.id, flow.client_id, flow.resource_id}
) do
:ok ->
:ok
{:error, reason} ->
Logger.error("Failed to broadcast flow expiration",
reason: inspect(reason),
flow_id: flow.id
)
end
end
end

View File

@@ -1,3 +1,5 @@
# TODO: Flows -> PolicyAuthorizations
# When this is renamed, "PolicyAuthorizations Authorizer" makes no sense, remove this module
defmodule Domain.Flows.Authorizer do
use Domain.Auth.Authorizer
alias Domain.Flows.{Flow, Activity}

View File

@@ -15,7 +15,6 @@ defmodule Domain.Flows.Flow do
field :gateway_remote_ip, Domain.Types.IP
field :expires_at, :utc_datetime_usec
timestamps(updated_at: false)
end
end

View File

@@ -5,14 +5,12 @@ defmodule Domain.Flows.Flow.Changeset do
@fields ~w[token_id policy_id client_id gateway_id resource_id
account_id
client_remote_ip client_user_agent
gateway_remote_ip
expires_at]a
@required_fields @fields -- ~w[expires_at]a
gateway_remote_ip]a
def create(attrs) do
%Flow{}
|> cast(attrs, @fields)
|> validate_required(@required_fields)
|> validate_required(@fields)
|> assoc_constraint(:token)
|> assoc_constraint(:policy)
|> assoc_constraint(:client)

View File

@@ -5,10 +5,6 @@ defmodule Domain.Flows.Flow.Query do
from(flows in Domain.Flows.Flow, as: :flows)
end
def not_expired(queryable \\ all()) do
where(queryable, [flows: flows], flows.expires_at > fragment("timezone('UTC', NOW())"))
end
def by_id(queryable, id) do
where(queryable, [flows: flows], flows.id == ^id)
end
@@ -61,17 +57,6 @@ defmodule Domain.Flows.Flow.Query do
where(queryable, [flows: flows], flows.gateway_id == ^gateway_id)
end
def expire(queryable) do
queryable
|> not_expired()
|> Ecto.Query.select([flows: flows], flows)
|> Ecto.Query.update([flows: flows],
set: [
expires_at: fragment("LEAST(?, timezone('UTC', NOW()))", flows.expires_at)
]
)
end
def with_joined_policy(queryable) do
with_named_binding(queryable, :policy, fn queryable, binding ->
join(queryable, :inner, [flows: flows], policy in assoc(flows, ^binding), as: ^binding)
@@ -100,27 +85,4 @@ defmodule Domain.Flows.Flow.Query do
{:flows, :desc, :inserted_at},
{:flows, :asc, :id}
]
@impl Domain.Repo.Query
def filters,
do: [
%Domain.Repo.Filter{
name: :expiration,
title: "Expired",
type: :string,
values: [
{"Expired", "expired"},
{"Not Expired", "not_expired"}
],
fun: &filter_by_expired/2
}
]
def filter_by_expired(queryable, "expired") do
{queryable, dynamic([flows: flows], flows.expires_at < fragment("timezone('UTC', NOW())"))}
end
def filter_by_expired(queryable, "not_expired") do
{queryable, dynamic([flows: flows], flows.expires_at >= fragment("timezone('UTC', NOW())"))}
end
end

View File

@@ -183,7 +183,9 @@ defmodule Domain.Tokens do
]}
with :ok <- Auth.ensure_has_permissions(subject, required_permissions) do
{:ok, _flows} = Domain.Flows.expire_flows_for(token, subject)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Domain.Flows.expire_flows_for(token, subject)
Token.Query.not_deleted()
|> Token.Query.by_id(token.id)
@@ -203,7 +205,9 @@ defmodule Domain.Tokens do
|> delete_tokens()
|> case do
{:ok, [token]} ->
{:ok, _flows} = Domain.Flows.expire_flows_for(token, subject)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Domain.Flows.expire_flows_for(token, subject)
{:ok, token}
{:ok, []} ->
@@ -212,7 +216,9 @@ defmodule Domain.Tokens do
end
def delete_tokens_for(%Auth.Identity{} = identity) do
{:ok, _flows} = Domain.Flows.expire_flows_for(identity)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Domain.Flows.expire_flows_for(identity)
Token.Query.not_deleted()
|> Token.Query.by_identity_id(identity.id)
@@ -221,7 +227,9 @@ defmodule Domain.Tokens do
def delete_tokens_for(%Actors.Actor{} = actor, %Auth.Subject{} = subject) do
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
{:ok, _flows} = Domain.Flows.expire_flows_for(actor, subject)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Domain.Flows.expire_flows_for(actor, subject)
Token.Query.not_deleted()
|> Token.Query.by_actor_id(actor.id)
@@ -232,7 +240,9 @@ defmodule Domain.Tokens do
def delete_tokens_for(%Auth.Identity{} = identity, %Auth.Subject{} = subject) do
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
{:ok, _flows} = Domain.Flows.expire_flows_for(identity, subject)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Domain.Flows.expire_flows_for(identity, subject)
Token.Query.not_deleted()
|> Token.Query.by_identity_id(identity.id)
@@ -243,7 +253,9 @@ defmodule Domain.Tokens do
def delete_tokens_for(%Auth.Provider{} = provider, %Auth.Subject{} = subject) do
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
{:ok, _flows} = Domain.Flows.expire_flows_for(provider, subject)
# TODO: WAL
# Broadcast flow side effects directly
:ok = Domain.Flows.expire_flows_for(provider, subject)
Token.Query.not_deleted()
|> Token.Query.by_provider_id(provider.id)

View File

@@ -0,0 +1,15 @@
defmodule Domain.Repo.Migrations.DropExpiresAtFromFlows do
use Ecto.Migration
def up do
alter table(:flows) do
remove(:expires_at)
end
end
def down do
alter table(:flows) do
add(:expires_at, :utc_datetime_usec, null: false)
end
end
end

View File

@@ -0,0 +1,15 @@
defmodule Domain.Repo.Migrations.ChangeColumnNullFlowsExpiresAt do
use Ecto.Migration
def up do
alter table(:flows) do
modify(:expires_at, :utc_datetime_usec, null: true)
end
end
def down do
alter table(:flows) do
modify(:expires_at, :utc_datetime_usec, null: false)
end
end
end

View File

@@ -1176,7 +1176,7 @@ defmodule Domain.Repo.Seeds do
IO.puts("")
{:ok, _resource, flow} =
{:ok, _resource, flow, _expires_at} =
Flows.authorize_flow(
user_iphone,
gateway1,

View File

@@ -1248,12 +1248,12 @@ defmodule Domain.ActorsTest do
"group_id" => group2.id
})
# TODO: WAL
# Remove this when direct broadcast is implemented
Process.sleep(100)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "deletes memberships of removed groups", %{
@@ -3043,16 +3043,21 @@ defmodule Domain.ActorsTest do
subject = Fixtures.Auth.create_subject(identity: identity)
client = Fixtures.Clients.create_client(account: account, identity: identity)
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client
)
flow =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client
)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
assert {:ok, _actor} = disable_actor(actor, subject)
expires_at = Repo.one(Domain.Flows.Flow).expires_at
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "returns error when trying to disable the last admin actor" do
@@ -3287,16 +3292,21 @@ defmodule Domain.ActorsTest do
} do
client = Fixtures.Clients.create_client(account: account, identity: identity)
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client
)
flow =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client
)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
assert {:ok, _actor} = delete_actor(actor, subject)
expires_at = Repo.one(Domain.Flows.Flow).expires_at
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "returns error when trying to delete the last admin actor", %{

View File

@@ -629,6 +629,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
Fixtures.Actors.create_membership(account: account, actor: actor, group: org_unit)
:ok = PubSub.Flow.subscribe(deleted_group_flow.id)
:ok = PubSub.Flow.subscribe(deleted_identity_flow.id)
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
@@ -752,12 +754,16 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
Process.sleep(100)
# Deleted policies expire all flows authorized by them
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_group_flow.id
client_id = deleted_group_flow.client_id
resource_id = deleted_group_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
# Expires flows for signed out user
deleted_identity_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_identity_flow.id
client_id = deleted_identity_flow.client_id
resource_id = deleted_identity_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
# Should not do anything else
refute_received {:allow_access, _policy_id, _group_id, _resource_id}

View File

@@ -416,6 +416,8 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
:ok = PubSub.Flow.subscribe(deleted_identity_flow.id)
:ok = PubSub.Flow.subscribe(deleted_group_flow.id)
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
@@ -516,12 +518,16 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
Process.sleep(100)
# Deleted policies expire all flows authorized by them
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_group_flow.id
client_id = deleted_group_flow.client_id
resource_id = deleted_group_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
# Expires flows for signed out user
deleted_identity_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_identity_flow.id
client_id = deleted_identity_flow.client_id
resource_id = deleted_identity_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
# Should not do anything else
refute_received {:allow_access, _policy_id, _group_id, _resource_id}

View File

@@ -464,6 +464,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
:ok = PubSub.Flow.subscribe(deleted_identity_flow.id)
:ok = PubSub.Flow.subscribe(deleted_group_flow.id)
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
@@ -583,12 +585,16 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
Process.sleep(100)
# Deleted policies expire all flows authorized by them
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_group_flow.id
client_id = deleted_group_flow.client_id
resource_id = deleted_group_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
# Expires flows for signed out user
deleted_identity_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_identity_flow.id
client_id = deleted_identity_flow.client_id
resource_id = deleted_identity_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
# Should not do anything else
refute_received {:allow_access, _policy_id, _group_id, _resource_id}

View File

@@ -708,6 +708,8 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
:ok = PubSub.Flow.subscribe(deleted_identity_flow.id)
:ok = PubSub.Flow.subscribe(deleted_group_flow.id)
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
@@ -818,12 +820,16 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
# Deleted policies expire all flows authorized by them
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_group_flow.id
client_id = deleted_group_flow.client_id
resource_id = deleted_group_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
# Expires flows for signed out user
deleted_identity_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_identity_flow.id
client_id = deleted_identity_flow.client_id
resource_id = deleted_identity_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
# Should not do anything else
refute_received {:allow_access, _policy_id, _group_id, _resource_id}

View File

@@ -1062,16 +1062,21 @@ defmodule Domain.AuthTest do
} do
client = Fixtures.Clients.create_client(account: account, identity: identity)
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client
)
flow =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client
)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = client.id
resource_id = flow.resource_id
assert {:ok, _provider} = disable_provider(provider, subject)
expires_at = Repo.one(Domain.Flows.Flow).expires_at
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "returns error when trying to disable the last provider", %{
@@ -1282,16 +1287,21 @@ defmodule Domain.AuthTest do
} do
client = Fixtures.Clients.create_client(account: account, identity: identity)
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client
)
flow =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client
)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = client.id
resource_id = flow.resource_id
assert {:ok, _provider} = delete_provider(provider, subject)
expires_at = Repo.one(Domain.Flows.Flow).expires_at
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "returns error when trying to delete the last provider", %{
@@ -1882,6 +1892,8 @@ defmodule Domain.AuthTest do
token_id: deleted_identity_token.id
)
:ok = Domain.PubSub.Flow.subscribe(deleted_identity_flow.id)
for n <- 1..4 do
Fixtures.Auth.create_identity(
account: account,
@@ -1948,8 +1960,10 @@ defmodule Domain.AuthTest do
assert deleted_identity_token.deleted_at
# Expires flows for signed out user
reloaded_flow = Repo.reload(deleted_identity_flow)
assert DateTime.compare(reloaded_flow.expires_at, DateTime.utc_now()) == :lt
flow_id = deleted_identity_flow.id
client_id = deleted_identity_flow.client_id
resource_id = deleted_identity_flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "circuit breaker prevents mass deletions of identities", %{
@@ -2783,18 +2797,23 @@ defmodule Domain.AuthTest do
} do
client = Fixtures.Clients.create_client(account: account, identity: identity)
Fixtures.Flows.create_flow(
account: account,
identity: identity,
actor: actor,
subject: subject,
client: client
)
flow =
Fixtures.Flows.create_flow(
account: account,
identity: identity,
actor: actor,
subject: subject,
client: client
)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert delete_identities_for(actor, subject) == :ok
expires_at = Repo.one(Domain.Flows.Flow).expires_at
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "does not remove identities that belong to another actor", %{

View File

@@ -1092,11 +1092,15 @@ defmodule Domain.ClientsTest do
subject: subject
)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
assert {:ok, client} = verify_client(client, subject)
assert {:ok, _client} = remove_client_verification(client, subject)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
flow_id = flow.id
client_id = client.id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "returns error when subject has no permission to verify clients", %{

View File

@@ -1,93 +0,0 @@
defmodule Domain.Events.Hooks.FlowsTest do
use Domain.DataCase, async: true
import Domain.Events.Hooks.Flows
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == on_insert(data)
end
end
describe "update/2" do
test "broadcasts expire_flow if flow is expired" do
flow_id = "flow_123"
client_id = "client_123"
resource_id = "resource_123"
old_data = %{}
data = %{
"expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(),
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
}
:ok = Domain.PubSub.Flow.subscribe(flow_id)
assert :ok == on_update(old_data, data)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "does not broadcast expire_flow if flow is not expired" do
flow_id = "flow_123"
client_id = "client_123"
resource_id = "resource_123"
old_data = %{}
data = %{
"expires_at" => DateTime.utc_now() |> DateTime.add(1, :second) |> DateTime.to_iso8601(),
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
}
:ok = Domain.PubSub.Flow.subscribe(flow_id)
assert :ok == on_update(old_data, data)
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "does not receive broadcast when not subscribed" do
flow_id = "flow_123"
client_id = "client_123"
resource_id = "resource_123"
old_data = %{}
data = %{
"expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(),
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
}
assert :ok == on_update(old_data, data)
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
end
describe "delete/1" do
test "broadcasts expire_flow" do
flow_id = "flow_123"
client_id = "client_123"
resource_id = "resource_123"
old_data = %{
"id" => flow_id,
"client_id" => client_id,
"resource_id" => resource_id
}
:ok = Domain.PubSub.Flow.subscribe(flow_id)
assert :ok == on_delete(old_data)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
end
end

View File

@@ -57,6 +57,8 @@ defmodule Domain.Events.Hooks.PoliciesTest do
test "disable: broadcasts :disable_policy and :reject_access" do
flow = Fixtures.Flows.create_flow()
flow_id = flow.id
client_id = flow.client_id
policy_id = flow.policy_id
account_id = flow.account_id
actor_group_id = "group-456"
@@ -72,6 +74,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "disabled_at", "2023-10-01T00:00:00Z")
:ok = PubSub.Flow.subscribe(flow_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
@@ -85,13 +88,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "soft-delete: broadcasts :delete_policy and :reject_access" do
flow = Fixtures.Flows.create_flow()
flow_id = flow.id
client_id = flow.client_id
policy_id = flow.policy_id
account_id = flow.account_id
actor_group_id = "group-456"
@@ -107,6 +110,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z")
:ok = PubSub.Flow.subscribe(flow_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
@@ -120,13 +124,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "breaking update: broadcasts :delete_policy, :reject_access, :create_policy, :allow_access" do
flow = Fixtures.Flows.create_flow()
flow_id = flow.id
client_id = flow.client_id
policy_id = flow.policy_id
account_id = flow.account_id
actor_group_id = "group-456"
@@ -142,12 +146,17 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "resource_id", "new-resource-123")
:ok = PubSub.Flow.subscribe(flow_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_update(old_data, data)
# TODO: WAL
# Remove this when side effects are directly broadcasted
Process.sleep(100)
assert_receive {:delete_policy, ^policy_id}
assert_receive {:delete_policy, ^policy_id}
assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id}
@@ -160,13 +169,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "breaking update: disabled policy has no side-effects" do
flow = Fixtures.Flows.create_flow()
flow_id = flow.id
client_id = flow.client_id
policy_id = flow.policy_id
account_id = flow.account_id
actor_group_id = "group-456"
@@ -197,9 +206,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt
refute_receive {:expire_flow, ^flow_id, ^client_id, "new-resource-123"}
end
test "non-breaking-update: broadcasts :update_policy" do
@@ -233,6 +240,8 @@ defmodule Domain.Events.Hooks.PoliciesTest do
describe "delete/1" do
test "broadcasts :delete_policy and :reject_access" do
flow = Fixtures.Flows.create_flow()
flow_id = flow.id
client_id = flow.client_id
policy_id = flow.policy_id
account_id = flow.account_id
actor_group_id = "group-456"
@@ -248,6 +257,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z")
:ok = PubSub.Flow.subscribe(flow_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
@@ -261,9 +271,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
end
end

View File

@@ -21,19 +21,16 @@ defmodule Domain.Events.Hooks.ResourceConnectionsTest do
describe "delete/1" do
test "returns :ok" do
flow = Fixtures.Flows.create_flow()
:ok = Domain.PubSub.Flow.subscribe(flow.id)
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :lt
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok ==
on_delete(%{"account_id" => flow.account_id, "resource_id" => flow.resource_id})
# TODO: WAL
# Remove this when flow removal is directly broadcasted
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
end
end

View File

@@ -68,6 +68,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "expires flows when resource type changes", %{flow: flow, old_data: old_data} do
:ok = PubSub.Flow.subscribe(flow.id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
@@ -78,12 +79,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do
# TODO: WAL
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:create_resource, ^resource_id}
@@ -91,6 +92,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "expires flows when resource address changes", %{flow: flow, old_data: old_data} do
:ok = PubSub.Flow.subscribe(flow.id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
@@ -101,12 +103,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do
# TODO: WAL
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:create_resource, ^resource_id}
@@ -114,6 +116,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "expires flows when resource filters change", %{flow: flow, old_data: old_data} do
:ok = PubSub.Flow.subscribe(flow.id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
@@ -124,12 +127,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do
# TODO: WAL
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:create_resource, ^resource_id}
@@ -137,6 +140,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "expires flows when resource ip_stack changes", %{flow: flow, old_data: old_data} do
:ok = PubSub.Flow.subscribe(flow.id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
@@ -147,12 +151,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do
# TODO: WAL
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
assert_receive {:create_resource, ^resource_id}
@@ -167,10 +171,15 @@ defmodule Domain.Events.Hooks.ResourcesTest do
assert :ok == on_update(old_data, data)
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :lt
# TODO: WAL
# Remove this after direct broadcast
Process.sleep(100)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
assert_receive {:update_resource, ^resource_id}
assert_receive {:update_resource, ^resource_id}
refute_receive {:delete_resource, ^resource_id}

View File

@@ -76,7 +76,7 @@ defmodule Domain.FlowsTest do
policy: policy,
subject: subject
} do
assert {:ok, fetched_resource, _flow} =
assert {:ok, fetched_resource, _flow, _expires_at} =
authorize_flow(client, gateway, resource.id, subject)
assert fetched_resource.id == resource.id
@@ -198,11 +198,11 @@ defmodule Domain.FlowsTest do
]
)
assert {:ok, _fetched_resource, flow} =
assert {:ok, _fetched_resource, flow, expires_at} =
authorize_flow(client, gateway, resource.id, subject)
assert flow.policy_id == policy.id
assert DateTime.diff(flow.expires_at, DateTime.new!(date, midnight)) < 5
assert DateTime.diff(expires_at, DateTime.new!(date, midnight)) < 5
end
test "creates a flow when all conditions for at least one of the policies are satisfied", %{
@@ -237,10 +237,11 @@ defmodule Domain.FlowsTest do
]
)
assert {:ok, _fetched_resource, flow} =
assert {:ok, _fetched_resource, flow, expires_at} =
authorize_flow(client, gateway, resource.id, subject)
assert flow.expires_at == subject.expires_at
assert flow.resource_id == resource.id
assert expires_at == subject.expires_at
end
test "creates a network flow for users", %{
@@ -256,7 +257,7 @@ defmodule Domain.FlowsTest do
Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group)
assert {:ok, _fetched_resource, %Flows.Flow{} = flow} =
assert {:ok, _fetched_resource, %Flows.Flow{} = flow, expires_at} =
authorize_flow(client, gateway, resource.id, subject)
assert flow.policy_id == policy.id
@@ -267,7 +268,7 @@ defmodule Domain.FlowsTest do
assert flow.client_remote_ip.address == subject.context.remote_ip
assert flow.client_user_agent == subject.context.user_agent
assert flow.gateway_remote_ip == gateway.last_seen_remote_ip
assert flow.expires_at == subject.expires_at
assert expires_at == subject.expires_at
end
test "creates a network flow for service accounts", %{
@@ -285,7 +286,7 @@ defmodule Domain.FlowsTest do
client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity)
assert {:ok, _fetched_resource, %Flows.Flow{} = flow} =
assert {:ok, _fetched_resource, %Flows.Flow{} = flow, expires_at} =
authorize_flow(client, gateway, resource.id, subject)
assert flow.policy_id == policy.id
@@ -296,7 +297,7 @@ defmodule Domain.FlowsTest do
assert flow.client_remote_ip.address == subject.context.remote_ip
assert flow.client_user_agent == subject.context.user_agent
assert flow.gateway_remote_ip == gateway.last_seen_remote_ip
assert flow.expires_at == subject.expires_at
assert expires_at == subject.expires_at
end
test "does not return authorized access to deleted resources", %{
@@ -375,7 +376,7 @@ defmodule Domain.FlowsTest do
resource: resource,
subject: subject
} do
assert {:ok, resource, _flow} =
assert {:ok, resource, _flow, _expires_at} =
authorize_flow(client, gateway, resource.id, subject, preload: :connections)
assert Ecto.assoc_loaded?(resource.connections)
@@ -929,27 +930,36 @@ defmodule Domain.FlowsTest do
flow: flow,
actor_group: actor_group
} do
assert {:ok, [expired_flow]} = expire_flows_for(actor_group)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for(actor_group)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "expires flows for client identity", %{
flow: flow,
identity: identity
} do
assert {:ok, [expired_flow]} = expire_flows_for(identity)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for(identity)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "expires flows for client", %{
flow: flow,
client: client
} do
assert {:ok, [expired_flow]} = expire_flows_for(client)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for(client)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
end
@@ -982,11 +992,13 @@ defmodule Domain.FlowsTest do
actor: actor,
policy: policy
} do
assert {:ok, [expired_flow]} =
expire_flows_for(actor.account_id, actor.id, policy.actor_group_id)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
assert :ok = expire_flows_for(actor.account_id, actor.id, policy.actor_group_id)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "expires flows for actor", %{
@@ -994,18 +1006,24 @@ defmodule Domain.FlowsTest do
actor: actor,
subject: subject
} do
assert {:ok, [expired_flow]} = expire_flows_for(actor, subject)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for(actor, subject)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "expires flows for policy", %{
flow: flow,
policy: policy
} do
assert {:ok, [expired_flow]} = expire_flows_for_policy_id(policy.account_id, policy.id)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for_policy_id(policy.account_id, policy.id)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "expires flows for resource", %{
@@ -1013,9 +1031,12 @@ defmodule Domain.FlowsTest do
resource: resource,
subject: subject
} do
assert {:ok, [expired_flow]} = expire_flows_for(resource, subject)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for(resource, subject)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "expires flows for policy actor group", %{
@@ -1023,9 +1044,12 @@ defmodule Domain.FlowsTest do
actor_group: actor_group,
subject: subject
} do
assert {:ok, [expired_flow]} = expire_flows_for(actor_group, subject)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for(actor_group, subject)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "expires flows for client identity", %{
@@ -1033,9 +1057,12 @@ defmodule Domain.FlowsTest do
identity: identity,
subject: subject
} do
assert {:ok, [expired_flow]} = expire_flows_for(identity, subject)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for(identity, subject)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "expires flows for client identity provider", %{
@@ -1043,20 +1070,12 @@ defmodule Domain.FlowsTest do
provider: provider,
subject: subject
} do
assert {:ok, [expired_flow]} = expire_flows_for(provider, subject)
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
assert expired_flow.id == flow.id
end
test "updates flow expiration expires_at", %{
flow: flow,
actor: actor,
subject: subject
} do
assert {:ok, [_expired_flow]} = expire_flows_for(actor, subject)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
:ok = Domain.PubSub.Flow.subscribe(flow.id)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert :ok = expire_flows_for(provider, subject)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "returns error when subject has no permission to expire flows", %{
@@ -1077,16 +1096,16 @@ defmodule Domain.FlowsTest do
actor_group: actor_group,
subject: subject
} do
assert {:ok, [_expired_flow]} = expire_flows_for(resource, subject)
assert {:ok, []} = expire_flows_for(actor_group, subject)
assert {:ok, []} = expire_flows_for(resource, subject)
assert :ok = expire_flows_for(resource, subject)
assert :ok = expire_flows_for(actor_group, subject)
assert :ok = expire_flows_for(resource, subject)
end
test "does not expire flows outside of account", %{
resource: resource
} do
subject = Fixtures.Auth.create_subject()
assert {:ok, []} = expire_flows_for(resource, subject)
assert :ok = expire_flows_for(resource, subject)
end
end
end

View File

@@ -1531,12 +1531,15 @@ defmodule Domain.ResourcesTest do
subject: subject
} do
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
:ok = Domain.PubSub.Flow.subscribe(flow_id)
attrs = %{"name" => "foo"}
assert {:ok, _resource} = update_resource(resource, attrs, subject)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "allows to update connections", %{account: account, resource: resource, subject: subject} do
@@ -1551,6 +1554,11 @@ defmodule Domain.ResourcesTest do
gateway2 = Fixtures.Gateways.create_gateway(account: account)
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
:ok = Domain.PubSub.Flow.subscribe(flow_id)
attrs = %{
"connections" => [
@@ -1575,11 +1583,7 @@ defmodule Domain.ResourcesTest do
"resource_id" => resource.id
})
# TODO: WAL
# Remove this after direct broadcast
Process.sleep(100)
flow = Repo.reload(flow)
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "does not allow to remove all connections", %{resource: resource, subject: subject} do

View File

@@ -511,16 +511,22 @@ defmodule Domain.TokensTest do
identity: identity,
subject: subject
} do
Fixtures.Flows.create_flow(
account: account,
identity: identity,
subject: subject
)
flow =
Fixtures.Flows.create_flow(
account: account,
identity: identity,
subject: subject
)
:ok = Domain.PubSub.Flow.subscribe(flow.id)
assert {:ok, _token} = delete_token_for(subject)
expires_at = Repo.one(Domain.Flows.Flow).expires_at
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
flow_id = flow.id
client_id = flow.client_id
resource_id = flow.resource_id
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
end
test "does not delete tokens for other actors", %{account: account, subject: subject} do

View File

@@ -82,8 +82,7 @@ defmodule Domain.Fixtures.Flows do
account_id: account.id,
client_remote_ip: client.last_seen_remote_ip,
client_user_agent: client.last_seen_user_agent,
gateway_remote_ip: gateway.last_seen_remote_ip,
expires_at: subject.expires_at
gateway_remote_ip: gateway.last_seen_remote_ip
})
|> Repo.insert!()
end

View File

@@ -562,11 +562,6 @@ defmodule Web.Actors.Show do
<br />
<code class="text-xs">{flow.gateway_remote_ip}</code>
</:col>
<:col :let={flow} :if={@flow_activities_enabled?} label="activity" class="w-1/12">
<.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}>
Show
</.link>
</:col>
<:empty>
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
</:empty>

View File

@@ -319,11 +319,6 @@ defmodule Web.Clients.Show do
<br />
<code class="text-xs">{flow.gateway_remote_ip}</code>
</:col>
<:col :let={flow} :if={@flow_activities_enabled?} label="activity">
<.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}>
Show
</.link>
</:col>
<:empty>
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
</:empty>

View File

@@ -1,75 +0,0 @@
defmodule Web.Flows.DownloadActivities do
use Web, :controller
alias Domain.Flows
def download(conn, %{"id" => id}) do
with {:ok, flow} <- Flows.fetch_flow_by_id(id, conn.assigns.subject) do
conn
|> put_resp_content_type("text/csv")
|> put_resp_header(
"content-disposition",
"attachment; filename=\"flow-activities-#{flow.id}.csv\""
)
|> put_root_layout(false)
|> send_chunked(200)
|> send_csv_header()
|> stream_csv_body(flow)
else
{:error, _reason} -> raise Web.LiveErrors.NotFoundError
end
end
defp send_csv_header(conn) do
iodata =
Web.CSV.dump_to_iodata([~w[
window_started_at window_ended_at
destination
connectivity_type
rx_bytes tx_bytes blocked_tx_bytes
]])
{:ok, conn} = chunk(conn, iodata)
conn
end
defp stream_csv_body(conn, flow, cursor \\ nil) do
with {:ok, activities, activities_metadata} <-
Flows.list_flow_activities_for(
flow,
conn.assigns.subject,
page: [cursor: cursor, limit: 100]
),
{:ok, conn} <- stream_csv_rows(conn, activities) do
if activities_metadata.next_page_cursor do
stream_csv_body(conn, flow, activities_metadata.next_page_cursor)
else
conn
end
else
{:error, _reason} ->
conn
end
end
defp stream_csv_rows(conn, activities) do
iodata =
activities
|> Enum.map(fn activity ->
[
to_string(activity.window_started_at),
to_string(activity.window_ended_at),
to_string(activity.destination),
to_string(activity.connectivity_type),
activity.rx_bytes,
activity.tx_bytes,
activity.blocked_tx_bytes
]
end)
|> Web.CSV.dump_to_iodata()
case chunk(conn, iodata) do
{:ok, conn} -> {:ok, conn}
{:error, reason} -> {:error, reason}
end
end
end

View File

@@ -1,190 +0,0 @@
defmodule Web.Flows.Show do
use Web, :live_view
import Web.Policies.Components
alias Domain.{Accounts, Flows}
def mount(%{"id" => id}, _session, socket) do
with true <- Accounts.flow_activities_enabled?(socket.assigns.account),
{:ok, flow} <-
Flows.fetch_flow_by_id(id, socket.assigns.subject,
preload: [
policy: [:resource, :actor_group],
client: [],
gateway: [:group],
resource: []
]
) do
last_used_connectivity_type = get_last_used_connectivity_type(flow, socket.assigns.subject)
socket =
socket
|> assign(
page_title: "Flow #{flow.id}",
flow: flow,
last_used_connectivity_type: last_used_connectivity_type
)
|> assign_live_table("activities",
query_module: Flows.Activity.Query,
sortable_fields: [],
callback: &handle_activities_update!/2
)
{:ok, socket}
else
_other -> raise Web.LiveErrors.NotFoundError
end
end
defp get_last_used_connectivity_type(flow, subject) do
case Flows.fetch_last_activity_for(flow, subject) do
{:ok, activity} -> to_string(activity.connectivity_type)
_other -> "N/A"
end
end
def handle_params(params, uri, socket) do
socket = handle_live_tables_params(socket, params, uri)
{:noreply, socket}
end
def handle_activities_update!(socket, list_opts) do
with {:ok, activities, metadata} <-
Flows.list_flow_activities_for(socket.assigns.flow, socket.assigns.subject, list_opts) do
{:ok,
assign(socket,
activities: activities,
activities_metadata: metadata
)}
end
end
def render(assigns) do
~H"""
<.breadcrumbs account={@account}>
<.breadcrumb>Flows</.breadcrumb>
<.breadcrumb path={~p"/#{@account}/flows/#{@flow.id}"}>
{@flow.client.name} flow
</.breadcrumb>
</.breadcrumbs>
<.section>
<:title>
Flow for: <code>{@flow.client.name}</code>
</:title>
<:action>
<.button
navigate={~p"/#{@account}/flows/#{@flow}/activities.csv"}
icon="hero-arrow-down-on-square"
>
Export to CSV
</.button>
</:action>
<:content flash={@flash}>
<.vertical_table id="flow">
<.vertical_table_row>
<:label>Authorized At</:label>
<:value>
<.relative_datetime datetime={@flow.inserted_at} />
</:value>
</.vertical_table_row>
<.vertical_table_row>
<:label>Expires At</:label>
<:value>
<.relative_datetime datetime={@flow.expires_at} />
</:value>
</.vertical_table_row>
<.vertical_table_row>
<:label>Policy</:label>
<:value>
<.link navigate={~p"/#{@account}/policies/#{@flow.policy_id}"} class={link_style()}>
<.policy_name policy={@flow.policy} />
</.link>
</:value>
</.vertical_table_row>
<.vertical_table_row>
<:label>Client</:label>
<:value>
<.link navigate={~p"/#{@account}/clients/#{@flow.client_id}"} class={link_style()}>
{@flow.client.name}
</.link>
<div>Remote IP: {@flow.client_remote_ip}</div>
<div>User Agent: {@flow.client_user_agent}</div>
</:value>
</.vertical_table_row>
<.vertical_table_row>
<:label>Gateway</:label>
<:value>
<.link navigate={~p"/#{@account}/gateways/#{@flow.gateway_id}"} class={link_style()}>
{@flow.gateway.group.name}-{@flow.gateway.name}
</.link>
<div>
Remote IP: {@flow.gateway_remote_ip}
</div>
</:value>
</.vertical_table_row>
<.vertical_table_row>
<:label>Resource</:label>
<:value>
<.link navigate={~p"/#{@account}/resources/#{@flow.resource_id}"} class={link_style()}>
{@flow.resource.name}
</.link>
</:value>
</.vertical_table_row>
<.vertical_table_row>
<:label>Connectivity Type</:label>
<:value>
{@last_used_connectivity_type}
</:value>
</.vertical_table_row>
</.vertical_table>
</:content>
</.section>
<.section>
<:title>Metrics</:title>
<:help>
Pre-aggregated metrics for this flow.
</:help>
<:content>
<.live_table
id="activities"
rows={@activities}
row_id={&"activities-#{&1.id}"}
filters={@filters_by_table_id["activities"]}
filter={@filter_form_by_table_id["activities"]}
ordered_by={@order_by_table_id["activities"]}
metadata={@activities_metadata}
>
<:col :let={activity} label="started">
<.relative_datetime datetime={activity.window_started_at} />
</:col>
<:col :let={activity} label="ended">
<.relative_datetime datetime={activity.window_ended_at} />
</:col>
<:col :let={activity} label="destination">
{activity.destination}
</:col>
<:col :let={activity} label="connectivity type">
{activity.connectivity_type}
</:col>
<:col :let={activity} label="rx">
{Sizeable.filesize(activity.rx_bytes)}
</:col>
<:col :let={activity} label="tx">
{Sizeable.filesize(activity.tx_bytes)}
</:col>
<:col :let={activity} label="blocked tx">
{Sizeable.filesize(activity.blocked_tx_bytes)}
</:col>
<:empty>
<div class="text-center text-neutral-500 p-4">No metrics to display.</div>
</:empty>
</.live_table>
</:content>
</.section>
"""
end
def handle_event(event, params, socket) when event in ["paginate", "order_by", "filter"],
do: handle_live_table_event(event, params, socket)
end

View File

@@ -270,11 +270,6 @@ defmodule Web.Policies.Show do
<br />
<code class="text-xs">{flow.gateway_remote_ip}</code>
</:col>
<:col :let={flow} :if={@flow_activities_enabled?} label="activity">
<.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={link_style()}>
Show
</.link>
</:col>
<:empty>
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
</:empty>

View File

@@ -366,11 +366,6 @@ defmodule Web.Resources.Show do
<br />
<code class="text-xs">{flow.gateway_remote_ip}</code>
</:col>
<:col :let={flow} :if={@flow_activities_enabled?} label="activity">
<.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}>
Show
</.link>
</:col>
<:empty>
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
</:empty>

View File

@@ -207,11 +207,6 @@ defmodule Web.Router do
live "/:id", Show
end
scope "/flows", Flows do
live "/:id", Show
get "/:id/activities.csv", DownloadActivities, :download
end
scope "/settings", Settings do
scope "/account" do
live "/", Account

View File

@@ -1,224 +0,0 @@
defmodule Web.Live.Flows.ShowTest do
use Web.ConnCase, async: true
setup do
account = Fixtures.Accounts.create_account()
actor = Fixtures.Actors.create_actor(type: :account_admin_user, account: account)
identity = Fixtures.Auth.create_identity(account: account, actor: actor)
subject = Fixtures.Auth.create_subject(account: account, actor: actor, identity: identity)
client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity)
flow = Fixtures.Flows.create_flow(account: account, client: client)
%{
account: account,
actor: actor,
identity: identity,
subject: subject,
client: client,
flow: flow
}
end
test "redirects to sign in page for unauthorized user", %{
account: account,
flow: flow,
conn: conn
} do
path = ~p"/#{account}/flows/#{flow}"
assert live(conn, path) ==
{:error,
{:redirect,
%{
to: ~p"/#{account}?#{%{redirect_to: path}}",
flash: %{"error" => "You must sign in to access this page."}
}}}
end
test "renders 404 error when flow activities are not enabled", %{
account: account,
identity: identity,
flow: flow,
conn: conn
} do
{:ok, account} =
Domain.Accounts.update_account(account, %{
features: %{
flow_activities: false
}
})
assert_raise Web.LiveErrors.NotFoundError, fn ->
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/flows/#{flow}") =~ "404"
end
end
test "renders breadcrumbs item", %{
account: account,
flow: flow,
client: client,
identity: identity,
conn: conn
} do
{:ok, _lv, html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/flows/#{flow}")
assert item = Floki.find(html, "[aria-label='Breadcrumb']")
breadcrumbs = String.trim(Floki.text(item))
assert breadcrumbs =~ "Flows"
assert breadcrumbs =~ "#{client.name} flow"
end
test "renders flows details", %{
account: account,
identity: identity,
flow: flow,
conn: conn
} do
flow =
Repo.preload(flow,
policy: [:resource, :actor_group],
client: [],
gateway: [:group],
resource: []
)
activity =
Fixtures.Flows.create_activity(
account: account,
flow: flow,
window_started_at: DateTime.truncate(flow.inserted_at, :second),
window_ended_at: DateTime.truncate(flow.expires_at, :second)
)
{:ok, lv, _html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/flows/#{flow}")
table =
lv
|> element("#flow")
|> render()
|> vertical_table_to_map()
assert table["authorized at"]
assert table["expires at"]
assert table["connectivity type"] =~ to_string(activity.connectivity_type)
assert table["client"] =~ flow.client.name
assert table["client"] =~ to_string(flow.client_remote_ip)
assert table["client"] =~ flow.client_user_agent
assert table["gateway"] =~ flow.gateway.name
assert table["gateway"] =~ to_string(flow.gateway_remote_ip)
assert table["resource"] =~ flow.resource.name
assert table["policy"] =~ flow.policy.resource.name
assert table["policy"] =~ flow.policy.actor_group.name
end
test "allows downloading activities", %{
account: account,
flow: flow,
identity: identity,
conn: conn
} do
activity =
Fixtures.Flows.create_activity(
account: account,
flow: flow,
window_started_at: DateTime.truncate(flow.inserted_at, :second),
window_ended_at: DateTime.truncate(flow.expires_at, :second)
)
{:ok, lv, _html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/flows/#{flow}")
lv
|> element("a", "Export to CSV")
|> render_click()
assert_redirected(lv, ~p"/#{account}/flows/#{flow}/activities.csv")
controller_conn = get(conn, ~p"/#{account}/flows/#{flow}/activities.csv")
assert redirected_to(controller_conn) =~ ~p"/#{account}"
assert flash(controller_conn, :error) == "You must sign in to access this page."
controller_conn =
conn
|> authorize_conn(identity)
|> get(~p"/#{account}/flows/#{flow}/activities.csv")
assert response = response(controller_conn, 200)
assert response
|> String.trim()
|> String.split("\n")
|> Enum.map(&String.split(&1, "\t")) ==
[
[
"window_started_at",
"window_ended_at",
"destination",
"connectivity_type",
"rx_bytes",
"tx_bytes",
"blocked_tx_bytes"
],
[
to_string(activity.window_started_at),
to_string(activity.window_ended_at),
to_string(activity.destination),
to_string(activity.connectivity_type),
to_string(activity.rx_bytes),
to_string(activity.tx_bytes),
to_string(activity.blocked_tx_bytes)
]
]
end
test "renders activities table", %{
account: account,
flow: flow,
identity: identity,
conn: conn
} do
activity =
Fixtures.Flows.create_activity(
account: account,
flow: flow,
window_started_at: DateTime.truncate(flow.inserted_at, :second),
window_ended_at: DateTime.truncate(flow.expires_at, :second),
tx_bytes: 1024 * 1024 * 1024 * 42
)
{:ok, lv, _html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/flows/#{flow}")
[row] =
lv
|> element("#activities")
|> render()
|> table_to_map()
assert row["started"]
assert row["ended"]
assert row["connectivity type"] == to_string(activity.connectivity_type)
assert row["destination"] == to_string(activity.destination)
assert row["rx"] == "#{activity.rx_bytes} B"
assert row["tx"] == "42 GB"
end
end

View File

@@ -59,7 +59,6 @@ config :domain, Domain.ChangeLogs.ReplicationConnection,
auth_providers
clients
flow_activities
flows
gateway_groups
gateways
policies
@@ -95,7 +94,6 @@ config :domain, Domain.Events.ReplicationConnection,
auth_providers
clients
flow_activities
flows
gateway_groups
gateways
policies