From 0b09d9f2f51a8dd11aeb8c3d6845534dc97ff19b Mon Sep 17 00:00:00 2001 From: Jamil Date: Fri, 27 Jun 2025 11:29:12 -0700 Subject: [PATCH] refactor(portal): don't rely on flows.expires_at (#9692) The `expires_at` column on the `flows` table was never used outside of the context in which the flow was created in the Client Channel. This ephemeral state, which is created in the `Domain.Flows.authorize_flow/4` function, is never read from the DB in any meaningful capacity, so it can be safely removed. The `expire_flows_for` family of functions now simply reads the needed fields from the flows table in order to broadcast `{:expire_flow, flow_id, client_id, resource_id}` directly to the subscribed entities. This PR is step 1 in removing the reliance on `Flows` to manage ephemeral access state. In a subsequent PR we will actually change the structure of what state is kept in the channel PIDs such that reliance on this Flows table will no longer be necessary. Additionally, in a few places, we were referencing a Flows.Show view that was never available in production, so this dead code has been removed. Lastly, the `flows` table subscription and associated hook processing has been completely removed as it is no longer needed. We've implemented in #9667 logic to remove publications from removed table subscriptions, so we can expect to get a couple ingest warnings when we deploy this as the `Hooks.Flows` processor no longer exists, and the WAL data may have lingering flows records in the queue. These can be safely ignored. --- .github/workflows/_integration_tests.yml | 2 +- .github/workflows/ci.yml | 2 +- elixir/apps/api/lib/api/client/channel.ex | 8 +- .../api/test/api/gateway/channel_test.exs | 6 +- .../change_logs/replication_connection.ex | 7 - elixir/apps/domain/lib/domain/clients.ex | 4 +- .../events/hooks/actor_group_memberships.ex | 5 +- .../domain/lib/domain/events/hooks/flows.ex | 51 ---- .../lib/domain/events/hooks/policies.ex | 12 +- .../events/hooks/resource_connections.ex | 7 +- .../lib/domain/events/hooks/resources.ex | 4 +- .../domain/events/replication_connection.ex | 1 - elixir/apps/domain/lib/domain/flows.ex | 38 ++- .../domain/lib/domain/flows/authorizer.ex | 2 + elixir/apps/domain/lib/domain/flows/flow.ex | 1 - .../domain/lib/domain/flows/flow/changeset.ex | 6 +- .../domain/lib/domain/flows/flow/query.ex | 38 --- elixir/apps/domain/lib/domain/tokens.ex | 24 +- ...50626175732_drop_expires_at_from_flows.exs | 15 ++ ...31_change_column_null_flows_expires_at.exs | 15 ++ elixir/apps/domain/priv/repo/seeds.exs | 2 +- .../apps/domain/test/domain/actors_test.exs | 48 ++-- .../jobs/sync_directory_test.exs | 14 +- .../jumpcloud/jobs/sync_directory_test.exs | 14 +- .../jobs/sync_directory_test.exs | 14 +- .../okta/jobs/sync_directory_test.exs | 14 +- elixir/apps/domain/test/domain/auth_test.exs | 75 +++--- .../apps/domain/test/domain/clients_test.exs | 8 +- .../test/domain/events/hooks/flows_test.exs | 93 -------- .../domain/events/hooks/policies_test.exs | 38 +-- .../hooks/resource_connections_test.exs | 13 +- .../domain/events/hooks/resources_test.exs | 35 ++- elixir/apps/domain/test/domain/flows_test.exs | 131 +++++----- .../domain/test/domain/resources_test.exs | 20 +- .../apps/domain/test/domain/tokens_test.exs | 20 +- .../domain/test/support/fixtures/flows.ex | 3 +- elixir/apps/web/lib/web/live/actors/show.ex | 5 - elixir/apps/web/lib/web/live/clients/show.ex | 5 - .../lib/web/live/flows/download_activities.ex | 75 ------ elixir/apps/web/lib/web/live/flows/show.ex | 190 --------------- elixir/apps/web/lib/web/live/policies/show.ex | 5 - .../apps/web/lib/web/live/resources/show.ex | 5 - elixir/apps/web/lib/web/router.ex | 5 - .../web/test/web/live/flows/show_test.exs | 224 ------------------ elixir/config/config.exs | 2 - 45 files changed, 386 insertions(+), 920 deletions(-) delete mode 100644 elixir/apps/domain/lib/domain/events/hooks/flows.ex create mode 100644 elixir/apps/domain/priv/repo/manual_migrations/20250626175732_drop_expires_at_from_flows.exs create mode 100644 elixir/apps/domain/priv/repo/migrations/20250626175731_change_column_null_flows_expires_at.exs delete mode 100644 elixir/apps/domain/test/domain/events/hooks/flows_test.exs delete mode 100644 elixir/apps/web/lib/web/live/flows/download_activities.ex delete mode 100644 elixir/apps/web/lib/web/live/flows/show.ex delete mode 100644 elixir/apps/web/test/web/live/flows/show_test.exs diff --git a/.github/workflows/_integration_tests.yml b/.github/workflows/_integration_tests.yml index 1c8aed2cc..479539184 100644 --- a/.github/workflows/_integration_tests.yml +++ b/.github/workflows/_integration_tests.yml @@ -120,7 +120,7 @@ jobs: with: project: firezone-staging - name: Seed database - run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed' + run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.migrate --migrations-path priv/repo/migrations --migrations-path priv/repo/manual_migrations && mix ecto.seed' - name: Start docker compose in the background run: | set -xe diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f706124c..fb1c5ad9c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -306,7 +306,7 @@ jobs: with: project: firezone-staging - name: Seed database - run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed' + run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed --migrations-path priv/repo/migrations --migrations-path priv/repo/manual_migrations' - name: Start docker compose in the background run: | # We need to increase the log level to make sure that they don't hold off storm of packets diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index d22b5641e..a607368a8 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -643,7 +643,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.set_attribute(:gateways_count, length(gateways)), gateway = Gateways.load_balance_gateways(location, gateways, connected_gateway_ids), OpenTelemetry.Tracer.set_attribute(:gateway_id, gateway.id), - {:ok, resource, flow} <- + {:ok, resource, flow, expires_at} <- Flows.authorize_flow( socket.assigns.client, gateway, @@ -665,7 +665,7 @@ defmodule API.Client.Channel do client_id: socket.assigns.client.id, resource_id: resource.id, flow_id: flow.id, - authorization_expires_at: flow.expires_at, + authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key }, {opentelemetry_ctx, opentelemetry_span_ctx}} @@ -789,7 +789,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.with_span "client.reuse_connection", attributes: attrs do with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), - {:ok, resource, flow} <- + {:ok, resource, flow, _expires_at} <- Flows.authorize_flow( socket.assigns.client, gateway, @@ -851,7 +851,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.with_span "client.request_connection", attributes: ctx_attrs do with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), - {:ok, resource, flow} <- + {:ok, resource, flow, _expires_at} <- Flows.authorize_flow( socket.assigns.client, gateway, diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 7f2209efc..c099907a4 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -224,7 +224,7 @@ defmodule API.Gateway.ChannelTest do assert_push "allow_access", %{} - {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + assert :ok = Domain.Flows.expire_flows_for(resource, subject) send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) @@ -647,7 +647,7 @@ defmodule API.Gateway.ChannelTest do assert_push "request_connection", %{} - {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + assert :ok = Domain.Flows.expire_flows_for(resource, subject) send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) @@ -878,7 +878,7 @@ defmodule API.Gateway.ChannelTest do assert_push "authorize_flow", %{} - {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + assert :ok = Domain.Flows.expire_flows_for(resource, subject) send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) diff --git a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex index 7b62a623e..1d5a8852f 100644 --- a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex @@ -34,13 +34,6 @@ defmodule Domain.ChangeLogs.ReplicationConnection do :ok end - defp log(_op, _lsn, "flows", _old_data, _data) do - # TODO: WAL - # Flows are not logged to the change log as they are used only to trigger side effects which - # will be removed. Remove the flows table publication when that happens. - :ok - end - defp log(op, lsn, table, old_data, data) do attrs = %{ op: op, diff --git a/elixir/apps/domain/lib/domain/clients.ex b/elixir/apps/domain/lib/domain/clients.ex index f03b39190..f166d4620 100644 --- a/elixir/apps/domain/lib/domain/clients.ex +++ b/elixir/apps/domain/lib/domain/clients.ex @@ -212,8 +212,10 @@ defmodule Domain.Clients do preload: [:online?] ) |> case do + # TODO: WAL + # Broadcast flow side effects directly {:ok, client} -> - {:ok, _flows} = Flows.expire_flows_for(client) + :ok = Flows.expire_flows_for(client) {:ok, client} {:error, reason} -> diff --git a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex index 8085e6f08..80ff29d56 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex @@ -22,7 +22,10 @@ defmodule Domain.Events.Hooks.ActorGroupMemberships do Task.start(fn -> :ok = PubSub.Actor.Memberships.broadcast(actor_id, {:delete_membership, actor_id, group_id}) broadcast_access(:reject, actor_id, group_id) - {:ok, _flows} = Flows.expire_flows_for(account_id, actor_id, group_id) + + # TODO: WAL + # Broadcast flow side effects directly + :ok = Flows.expire_flows_for(account_id, actor_id, group_id) end) :ok diff --git a/elixir/apps/domain/lib/domain/events/hooks/flows.ex b/elixir/apps/domain/lib/domain/events/hooks/flows.ex deleted file mode 100644 index d0806ab03..000000000 --- a/elixir/apps/domain/lib/domain/events/hooks/flows.ex +++ /dev/null @@ -1,51 +0,0 @@ -# TODO: WAL -# Move side-effects from flows to state table in clients and gateways -defmodule Domain.Events.Hooks.Flows do - @behaviour Domain.Events.Hooks - alias Domain.PubSub - require Logger - - @impl true - def on_insert(_data), do: :ok - - @impl true - def on_update( - _old_data, - %{ - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id, - "expires_at" => expires_at - } = _data - ) do - if expired?(expires_at) do - # Flow has become expired - PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) - else - :ok - end - end - - # During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases - # where we might manually clear flows in a migration or some other mechanism. - @impl true - def on_delete( - %{ - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } = _old_data - ) do - PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) - end - - defp expired?(nil), do: false - - defp expired?(expires_at) do - with {:ok, expires_at, _} <- DateTime.from_iso8601(expires_at) do - DateTime.compare(DateTime.utc_now(), expires_at) == :gt - else - _ -> false - end - end -end diff --git a/elixir/apps/domain/lib/domain/events/hooks/policies.ex b/elixir/apps/domain/lib/domain/events/hooks/policies.ex index 86aeeb2c8..0a2cd23ed 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/policies.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/policies.ex @@ -69,7 +69,9 @@ defmodule Domain.Events.Hooks.Policies do payload = {:reject_access, policy_id, actor_group_id, resource_id} :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) - Flows.expire_flows_for_policy_id(account_id, policy_id) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Flows.expire_flows_for_policy_id(account_id, policy_id) end) :ok @@ -124,7 +126,9 @@ defmodule Domain.Events.Hooks.Policies do payload = {:allow_access, policy_id, actor_group_id, resource_id} :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) - Flows.expire_flows_for_policy_id(account_id, policy_id) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Flows.expire_flows_for_policy_id(account_id, policy_id) end) else Logger.warning("Breaking update ignored for policy as it is deleted or disabled", @@ -161,7 +165,9 @@ defmodule Domain.Events.Hooks.Policies do payload = {:reject_access, policy_id, actor_group_id, resource_id} :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) - Flows.expire_flows_for_policy_id(account_id, policy_id) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Flows.expire_flows_for_policy_id(account_id, policy_id) end) :ok diff --git a/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex b/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex index 0510bd16a..5c762fa09 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex @@ -11,11 +11,10 @@ defmodule Domain.Events.Hooks.ResourceConnections do @impl true def on_delete(%{"account_id" => account_id, "resource_id" => resource_id} = _old_data) do # TODO: WAL - # The flow expires_at field is not used for any persistence-related reason. - # Remove it and broadcast directly to subscribed pids to remove the flow - # from their local state. This hook is called when resources change sites. + # Broadcast flow side effects directly + # This hook is called when resources change sites. Task.start(fn -> - {:ok, _flows} = Flows.expire_flows_for_resource_id(account_id, resource_id) + :ok = Flows.expire_flows_for_resource_id(account_id, resource_id) end) :ok diff --git a/elixir/apps/domain/lib/domain/events/hooks/resources.ex b/elixir/apps/domain/lib/domain/events/hooks/resources.ex index e8d5b66b2..7ca4f678e 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/resources.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/resources.ex @@ -39,7 +39,7 @@ defmodule Domain.Events.Hooks.Resources do old_filters != filters or old_ip_stack != ip_stack do # TODO: WAL - # Directly broadcast to subscribed pids to remove the flow + # Broadcast flow side effects directly Task.start(fn -> payload = {:delete_resource, resource_id} PubSub.Resource.broadcast(resource_id, payload) @@ -49,7 +49,7 @@ defmodule Domain.Events.Hooks.Resources do PubSub.Resource.broadcast(resource_id, payload) PubSub.Account.Resources.broadcast(account_id, payload) - {:ok, _flows} = Flows.expire_flows_for_resource_id(account_id, resource_id) + :ok = Flows.expire_flows_for_resource_id(account_id, resource_id) end) :ok diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index e1401dd95..78731dcbc 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -19,7 +19,6 @@ defmodule Domain.Events.ReplicationConnection do "auth_providers" => Hooks.AuthProviders, "clients" => Hooks.Clients, "flow_activities" => Hooks.FlowActivities, - "flows" => Hooks.Flows, "gateway_groups" => Hooks.GatewayGroups, "gateways" => Hooks.Gateways, "policies" => Hooks.Policies, diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index a59aa99d5..c71a3ee32 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -43,12 +43,13 @@ defmodule Domain.Flows do account_id: account_id, client_remote_ip: client_remote_ip, client_user_agent: client_user_agent, - gateway_remote_ip: gateway_remote_ip, - expires_at: conformation_expires_at || expires_at + gateway_remote_ip: gateway_remote_ip }) |> Repo.insert!() - {:ok, resource, flow} + expires_at = conformation_expires_at || expires_at + + {:ok, resource, flow, expires_at} end end @@ -250,11 +251,32 @@ defmodule Domain.Flows do end defp expire_flows(queryable) do - {_count, flows} = - queryable - |> Flow.Query.expire() - |> Repo.update_all([]) + {:ok, :ok} = + Repo.transaction(fn -> + queryable + |> Repo.stream() + |> Stream.chunk_every(100) + |> Enum.each(fn chunk -> + Enum.each(chunk, &broadcast_flow_expiration/1) + end) + end) - {:ok, flows} + :ok + end + + defp broadcast_flow_expiration(flow) do + case Domain.PubSub.Flow.broadcast( + flow.id, + {:expire_flow, flow.id, flow.client_id, flow.resource_id} + ) do + :ok -> + :ok + + {:error, reason} -> + Logger.error("Failed to broadcast flow expiration", + reason: inspect(reason), + flow_id: flow.id + ) + end end end diff --git a/elixir/apps/domain/lib/domain/flows/authorizer.ex b/elixir/apps/domain/lib/domain/flows/authorizer.ex index e258dedf8..f28df64a3 100644 --- a/elixir/apps/domain/lib/domain/flows/authorizer.ex +++ b/elixir/apps/domain/lib/domain/flows/authorizer.ex @@ -1,3 +1,5 @@ +# TODO: Flows -> PolicyAuthorizations +# When this is renamed, "PolicyAuthorizations Authorizer" makes no sense, remove this module defmodule Domain.Flows.Authorizer do use Domain.Auth.Authorizer alias Domain.Flows.{Flow, Activity} diff --git a/elixir/apps/domain/lib/domain/flows/flow.ex b/elixir/apps/domain/lib/domain/flows/flow.ex index 93294d9b6..47d72108e 100644 --- a/elixir/apps/domain/lib/domain/flows/flow.ex +++ b/elixir/apps/domain/lib/domain/flows/flow.ex @@ -15,7 +15,6 @@ defmodule Domain.Flows.Flow do field :gateway_remote_ip, Domain.Types.IP - field :expires_at, :utc_datetime_usec timestamps(updated_at: false) end end diff --git a/elixir/apps/domain/lib/domain/flows/flow/changeset.ex b/elixir/apps/domain/lib/domain/flows/flow/changeset.ex index 8c27469ff..e2e1df0c6 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/changeset.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/changeset.ex @@ -5,14 +5,12 @@ defmodule Domain.Flows.Flow.Changeset do @fields ~w[token_id policy_id client_id gateway_id resource_id account_id client_remote_ip client_user_agent - gateway_remote_ip - expires_at]a - @required_fields @fields -- ~w[expires_at]a + gateway_remote_ip]a def create(attrs) do %Flow{} |> cast(attrs, @fields) - |> validate_required(@required_fields) + |> validate_required(@fields) |> assoc_constraint(:token) |> assoc_constraint(:policy) |> assoc_constraint(:client) diff --git a/elixir/apps/domain/lib/domain/flows/flow/query.ex b/elixir/apps/domain/lib/domain/flows/flow/query.ex index 9828ad6b3..be0f9dd8e 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/query.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/query.ex @@ -5,10 +5,6 @@ defmodule Domain.Flows.Flow.Query do from(flows in Domain.Flows.Flow, as: :flows) end - def not_expired(queryable \\ all()) do - where(queryable, [flows: flows], flows.expires_at > fragment("timezone('UTC', NOW())")) - end - def by_id(queryable, id) do where(queryable, [flows: flows], flows.id == ^id) end @@ -61,17 +57,6 @@ defmodule Domain.Flows.Flow.Query do where(queryable, [flows: flows], flows.gateway_id == ^gateway_id) end - def expire(queryable) do - queryable - |> not_expired() - |> Ecto.Query.select([flows: flows], flows) - |> Ecto.Query.update([flows: flows], - set: [ - expires_at: fragment("LEAST(?, timezone('UTC', NOW()))", flows.expires_at) - ] - ) - end - def with_joined_policy(queryable) do with_named_binding(queryable, :policy, fn queryable, binding -> join(queryable, :inner, [flows: flows], policy in assoc(flows, ^binding), as: ^binding) @@ -100,27 +85,4 @@ defmodule Domain.Flows.Flow.Query do {:flows, :desc, :inserted_at}, {:flows, :asc, :id} ] - - @impl Domain.Repo.Query - def filters, - do: [ - %Domain.Repo.Filter{ - name: :expiration, - title: "Expired", - type: :string, - values: [ - {"Expired", "expired"}, - {"Not Expired", "not_expired"} - ], - fun: &filter_by_expired/2 - } - ] - - def filter_by_expired(queryable, "expired") do - {queryable, dynamic([flows: flows], flows.expires_at < fragment("timezone('UTC', NOW())"))} - end - - def filter_by_expired(queryable, "not_expired") do - {queryable, dynamic([flows: flows], flows.expires_at >= fragment("timezone('UTC', NOW())"))} - end end diff --git a/elixir/apps/domain/lib/domain/tokens.ex b/elixir/apps/domain/lib/domain/tokens.ex index 5c6bd14db..b5b3892ec 100644 --- a/elixir/apps/domain/lib/domain/tokens.ex +++ b/elixir/apps/domain/lib/domain/tokens.ex @@ -183,7 +183,9 @@ defmodule Domain.Tokens do ]} with :ok <- Auth.ensure_has_permissions(subject, required_permissions) do - {:ok, _flows} = Domain.Flows.expire_flows_for(token, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(token, subject) Token.Query.not_deleted() |> Token.Query.by_id(token.id) @@ -203,7 +205,9 @@ defmodule Domain.Tokens do |> delete_tokens() |> case do {:ok, [token]} -> - {:ok, _flows} = Domain.Flows.expire_flows_for(token, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(token, subject) {:ok, token} {:ok, []} -> @@ -212,7 +216,9 @@ defmodule Domain.Tokens do end def delete_tokens_for(%Auth.Identity{} = identity) do - {:ok, _flows} = Domain.Flows.expire_flows_for(identity) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(identity) Token.Query.not_deleted() |> Token.Query.by_identity_id(identity.id) @@ -221,7 +227,9 @@ defmodule Domain.Tokens do def delete_tokens_for(%Actors.Actor{} = actor, %Auth.Subject{} = subject) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do - {:ok, _flows} = Domain.Flows.expire_flows_for(actor, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(actor, subject) Token.Query.not_deleted() |> Token.Query.by_actor_id(actor.id) @@ -232,7 +240,9 @@ defmodule Domain.Tokens do def delete_tokens_for(%Auth.Identity{} = identity, %Auth.Subject{} = subject) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do - {:ok, _flows} = Domain.Flows.expire_flows_for(identity, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(identity, subject) Token.Query.not_deleted() |> Token.Query.by_identity_id(identity.id) @@ -243,7 +253,9 @@ defmodule Domain.Tokens do def delete_tokens_for(%Auth.Provider{} = provider, %Auth.Subject{} = subject) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do - {:ok, _flows} = Domain.Flows.expire_flows_for(provider, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(provider, subject) Token.Query.not_deleted() |> Token.Query.by_provider_id(provider.id) diff --git a/elixir/apps/domain/priv/repo/manual_migrations/20250626175732_drop_expires_at_from_flows.exs b/elixir/apps/domain/priv/repo/manual_migrations/20250626175732_drop_expires_at_from_flows.exs new file mode 100644 index 000000000..afbc9fd12 --- /dev/null +++ b/elixir/apps/domain/priv/repo/manual_migrations/20250626175732_drop_expires_at_from_flows.exs @@ -0,0 +1,15 @@ +defmodule Domain.Repo.Migrations.DropExpiresAtFromFlows do + use Ecto.Migration + + def up do + alter table(:flows) do + remove(:expires_at) + end + end + + def down do + alter table(:flows) do + add(:expires_at, :utc_datetime_usec, null: false) + end + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250626175731_change_column_null_flows_expires_at.exs b/elixir/apps/domain/priv/repo/migrations/20250626175731_change_column_null_flows_expires_at.exs new file mode 100644 index 000000000..cc0d28997 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250626175731_change_column_null_flows_expires_at.exs @@ -0,0 +1,15 @@ +defmodule Domain.Repo.Migrations.ChangeColumnNullFlowsExpiresAt do + use Ecto.Migration + + def up do + alter table(:flows) do + modify(:expires_at, :utc_datetime_usec, null: true) + end + end + + def down do + alter table(:flows) do + modify(:expires_at, :utc_datetime_usec, null: false) + end + end +end diff --git a/elixir/apps/domain/priv/repo/seeds.exs b/elixir/apps/domain/priv/repo/seeds.exs index b83c2f323..19d25f0c9 100644 --- a/elixir/apps/domain/priv/repo/seeds.exs +++ b/elixir/apps/domain/priv/repo/seeds.exs @@ -1176,7 +1176,7 @@ defmodule Domain.Repo.Seeds do IO.puts("") - {:ok, _resource, flow} = + {:ok, _resource, flow, _expires_at} = Flows.authorize_flow( user_iphone, gateway1, diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index 4b0da99dc..5da0422eb 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -1248,12 +1248,12 @@ defmodule Domain.ActorsTest do "group_id" => group2.id }) - # TODO: WAL - # Remove this when direct broadcast is implemented - Process.sleep(100) + :ok = Domain.PubSub.Flow.subscribe(flow.id) - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "deletes memberships of removed groups", %{ @@ -3043,16 +3043,21 @@ defmodule Domain.ActorsTest do subject = Fixtures.Auth.create_subject(identity: identity) client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) assert {:ok, _actor} = disable_actor(actor, subject) - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when trying to disable the last admin actor" do @@ -3287,16 +3292,21 @@ defmodule Domain.ActorsTest do } do client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) assert {:ok, _actor} = delete_actor(actor, subject) - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when trying to delete the last admin actor", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs index e4f7b2804..b376387e7 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs @@ -629,6 +629,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) Fixtures.Actors.create_membership(account: account, actor: actor, group: org_unit) + :ok = PubSub.Flow.subscribe(deleted_group_flow.id) + :ok = PubSub.Flow.subscribe(deleted_identity_flow.id) :ok = PubSub.Actor.Memberships.subscribe(actor.id) :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) @@ -752,12 +754,16 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do Process.sleep(100) # Deleted policies expire all flows authorized by them - deleted_group_flow = Repo.reload(deleted_group_flow) - assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_group_flow.id + client_id = deleted_group_flow.client_id + resource_id = deleted_group_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Expires flows for signed out user - deleted_identity_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Should not do anything else refute_received {:allow_access, _policy_id, _group_id, _resource_id} diff --git a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs index 0823ecd86..df00b99d1 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs @@ -416,6 +416,8 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) + :ok = PubSub.Flow.subscribe(deleted_identity_flow.id) + :ok = PubSub.Flow.subscribe(deleted_group_flow.id) :ok = PubSub.Actor.Memberships.subscribe(actor.id) :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) @@ -516,12 +518,16 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do Process.sleep(100) # Deleted policies expire all flows authorized by them - deleted_group_flow = Repo.reload(deleted_group_flow) - assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_group_flow.id + client_id = deleted_group_flow.client_id + resource_id = deleted_group_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Expires flows for signed out user - deleted_identity_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Should not do anything else refute_received {:allow_access, _policy_id, _group_id, _resource_id} diff --git a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs index 7e7b1c8cb..7447fe8a9 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs @@ -464,6 +464,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) + :ok = PubSub.Flow.subscribe(deleted_identity_flow.id) + :ok = PubSub.Flow.subscribe(deleted_group_flow.id) :ok = PubSub.Actor.Memberships.subscribe(actor.id) :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) @@ -583,12 +585,16 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do Process.sleep(100) # Deleted policies expire all flows authorized by them - deleted_group_flow = Repo.reload(deleted_group_flow) - assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_group_flow.id + client_id = deleted_group_flow.client_id + resource_id = deleted_group_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Expires flows for signed out user - deleted_identity_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Should not do anything else refute_received {:allow_access, _policy_id, _group_id, _resource_id} diff --git a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs index 302ab9991..6971cdb23 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs @@ -708,6 +708,8 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) + :ok = PubSub.Flow.subscribe(deleted_identity_flow.id) + :ok = PubSub.Flow.subscribe(deleted_group_flow.id) :ok = PubSub.Actor.Memberships.subscribe(actor.id) :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) @@ -818,12 +820,16 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - deleted_group_flow = Repo.reload(deleted_group_flow) - assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_group_flow.id + client_id = deleted_group_flow.client_id + resource_id = deleted_group_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Expires flows for signed out user - deleted_identity_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Should not do anything else refute_received {:allow_access, _policy_id, _group_id, _resource_id} diff --git a/elixir/apps/domain/test/domain/auth_test.exs b/elixir/apps/domain/test/domain/auth_test.exs index efa56ff72..a7193da75 100644 --- a/elixir/apps/domain/test/domain/auth_test.exs +++ b/elixir/apps/domain/test/domain/auth_test.exs @@ -1062,16 +1062,21 @@ defmodule Domain.AuthTest do } do client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) + + flow_id = flow.id + client_id = client.id + resource_id = flow.resource_id assert {:ok, _provider} = disable_provider(provider, subject) - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when trying to disable the last provider", %{ @@ -1282,16 +1287,21 @@ defmodule Domain.AuthTest do } do client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) + + flow_id = flow.id + client_id = client.id + resource_id = flow.resource_id assert {:ok, _provider} = delete_provider(provider, subject) - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when trying to delete the last provider", %{ @@ -1882,6 +1892,8 @@ defmodule Domain.AuthTest do token_id: deleted_identity_token.id ) + :ok = Domain.PubSub.Flow.subscribe(deleted_identity_flow.id) + for n <- 1..4 do Fixtures.Auth.create_identity( account: account, @@ -1948,8 +1960,10 @@ defmodule Domain.AuthTest do assert deleted_identity_token.deleted_at # Expires flows for signed out user - reloaded_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(reloaded_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "circuit breaker prevents mass deletions of identities", %{ @@ -2783,18 +2797,23 @@ defmodule Domain.AuthTest do } do client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - identity: identity, - actor: actor, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + identity: identity, + actor: actor, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) + + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id assert delete_identities_for(actor, subject) == :ok - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "does not remove identities that belong to another actor", %{ diff --git a/elixir/apps/domain/test/domain/clients_test.exs b/elixir/apps/domain/test/domain/clients_test.exs index abd8baa8f..be184c264 100644 --- a/elixir/apps/domain/test/domain/clients_test.exs +++ b/elixir/apps/domain/test/domain/clients_test.exs @@ -1092,11 +1092,15 @@ defmodule Domain.ClientsTest do subject: subject ) + :ok = Domain.PubSub.Flow.subscribe(flow.id) + assert {:ok, client} = verify_client(client, subject) assert {:ok, _client} = remove_client_verification(client, subject) - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + flow_id = flow.id + client_id = client.id + resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when subject has no permission to verify clients", %{ diff --git a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs deleted file mode 100644 index ca99f0262..000000000 --- a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs +++ /dev/null @@ -1,93 +0,0 @@ -defmodule Domain.Events.Hooks.FlowsTest do - use Domain.DataCase, async: true - import Domain.Events.Hooks.Flows - - setup do - %{old_data: %{}, data: %{}} - end - - describe "insert/1" do - test "returns :ok", %{data: data} do - assert :ok == on_insert(data) - end - end - - describe "update/2" do - test "broadcasts expire_flow if flow is expired" do - flow_id = "flow_123" - client_id = "client_123" - resource_id = "resource_123" - - old_data = %{} - - data = %{ - "expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(), - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } - - :ok = Domain.PubSub.Flow.subscribe(flow_id) - - assert :ok == on_update(old_data, data) - assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} - end - - test "does not broadcast expire_flow if flow is not expired" do - flow_id = "flow_123" - client_id = "client_123" - resource_id = "resource_123" - - old_data = %{} - - data = %{ - "expires_at" => DateTime.utc_now() |> DateTime.add(1, :second) |> DateTime.to_iso8601(), - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } - - :ok = Domain.PubSub.Flow.subscribe(flow_id) - - assert :ok == on_update(old_data, data) - refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} - end - - test "does not receive broadcast when not subscribed" do - flow_id = "flow_123" - client_id = "client_123" - resource_id = "resource_123" - - old_data = %{} - - data = %{ - "expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(), - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } - - assert :ok == on_update(old_data, data) - refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} - end - end - - describe "delete/1" do - test "broadcasts expire_flow" do - flow_id = "flow_123" - client_id = "client_123" - resource_id = "resource_123" - - old_data = %{ - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } - - :ok = Domain.PubSub.Flow.subscribe(flow_id) - - assert :ok == on_delete(old_data) - assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} - end - end -end diff --git a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs index 9cfa6c3f1..2654571b1 100644 --- a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs @@ -57,6 +57,8 @@ defmodule Domain.Events.Hooks.PoliciesTest do test "disable: broadcasts :disable_policy and :reject_access" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -72,6 +74,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "disabled_at", "2023-10-01T00:00:00Z") + :ok = PubSub.Flow.subscribe(flow_id) :ok = PubSub.Policy.subscribe(policy_id) :ok = PubSub.Account.Policies.subscribe(account_id) :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) @@ -85,13 +88,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "soft-delete: broadcasts :delete_policy and :reject_access" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -107,6 +110,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z") + :ok = PubSub.Flow.subscribe(flow_id) :ok = PubSub.Policy.subscribe(policy_id) :ok = PubSub.Account.Policies.subscribe(account_id) :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) @@ -120,13 +124,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "breaking update: broadcasts :delete_policy, :reject_access, :create_policy, :allow_access" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -142,12 +146,17 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "resource_id", "new-resource-123") + :ok = PubSub.Flow.subscribe(flow_id) :ok = PubSub.Policy.subscribe(policy_id) :ok = PubSub.Account.Policies.subscribe(account_id) :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) + # TODO: WAL + # Remove this when side effects are directly broadcasted + Process.sleep(100) + assert_receive {:delete_policy, ^policy_id} assert_receive {:delete_policy, ^policy_id} assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id} @@ -160,13 +169,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "breaking update: disabled policy has no side-effects" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -197,9 +206,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt + refute_receive {:expire_flow, ^flow_id, ^client_id, "new-resource-123"} end test "non-breaking-update: broadcasts :update_policy" do @@ -233,6 +240,8 @@ defmodule Domain.Events.Hooks.PoliciesTest do describe "delete/1" do test "broadcasts :delete_policy and :reject_access" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -248,6 +257,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z") + :ok = PubSub.Flow.subscribe(flow_id) :ok = PubSub.Policy.subscribe(policy_id) :ok = PubSub.Account.Policies.subscribe(account_id) :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) @@ -261,9 +271,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end end diff --git a/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs b/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs index 88e90e780..deb79ac32 100644 --- a/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs @@ -21,19 +21,16 @@ defmodule Domain.Events.Hooks.ResourceConnectionsTest do describe "delete/1" do test "returns :ok" do flow = Fixtures.Flows.create_flow() + :ok = Domain.PubSub.Flow.subscribe(flow.id) - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :lt + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id assert :ok == on_delete(%{"account_id" => flow.account_id, "resource_id" => flow.resource_id}) - # TODO: WAL - # Remove this when flow removal is directly broadcasted - Process.sleep(100) - - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end end diff --git a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs index 593cd3aed..26eb93575 100644 --- a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs @@ -68,6 +68,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource type changes", %{flow: flow, old_data: old_data} do + :ok = PubSub.Flow.subscribe(flow.id) :ok = PubSub.Resource.subscribe(flow.resource_id) :ok = PubSub.Account.Resources.subscribe(flow.account_id) @@ -78,12 +79,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do # TODO: WAL # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} @@ -91,6 +92,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource address changes", %{flow: flow, old_data: old_data} do + :ok = PubSub.Flow.subscribe(flow.id) :ok = PubSub.Resource.subscribe(flow.resource_id) :ok = PubSub.Account.Resources.subscribe(flow.account_id) @@ -101,12 +103,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do # TODO: WAL # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} @@ -114,6 +116,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource filters change", %{flow: flow, old_data: old_data} do + :ok = PubSub.Flow.subscribe(flow.id) :ok = PubSub.Resource.subscribe(flow.resource_id) :ok = PubSub.Account.Resources.subscribe(flow.account_id) @@ -124,12 +127,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do # TODO: WAL # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} @@ -137,6 +140,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource ip_stack changes", %{flow: flow, old_data: old_data} do + :ok = PubSub.Flow.subscribe(flow.id) :ok = PubSub.Resource.subscribe(flow.resource_id) :ok = PubSub.Account.Resources.subscribe(flow.account_id) @@ -147,12 +151,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do # TODO: WAL # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} @@ -167,10 +171,15 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert :ok == on_update(old_data, data) - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :lt + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:update_resource, ^resource_id} assert_receive {:update_resource, ^resource_id} refute_receive {:delete_resource, ^resource_id} diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index 9a4adc43f..928f87c76 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -76,7 +76,7 @@ defmodule Domain.FlowsTest do policy: policy, subject: subject } do - assert {:ok, fetched_resource, _flow} = + assert {:ok, fetched_resource, _flow, _expires_at} = authorize_flow(client, gateway, resource.id, subject) assert fetched_resource.id == resource.id @@ -198,11 +198,11 @@ defmodule Domain.FlowsTest do ] ) - assert {:ok, _fetched_resource, flow} = + assert {:ok, _fetched_resource, flow, expires_at} = authorize_flow(client, gateway, resource.id, subject) assert flow.policy_id == policy.id - assert DateTime.diff(flow.expires_at, DateTime.new!(date, midnight)) < 5 + assert DateTime.diff(expires_at, DateTime.new!(date, midnight)) < 5 end test "creates a flow when all conditions for at least one of the policies are satisfied", %{ @@ -237,10 +237,11 @@ defmodule Domain.FlowsTest do ] ) - assert {:ok, _fetched_resource, flow} = + assert {:ok, _fetched_resource, flow, expires_at} = authorize_flow(client, gateway, resource.id, subject) - assert flow.expires_at == subject.expires_at + assert flow.resource_id == resource.id + assert expires_at == subject.expires_at end test "creates a network flow for users", %{ @@ -256,7 +257,7 @@ defmodule Domain.FlowsTest do Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group) - assert {:ok, _fetched_resource, %Flows.Flow{} = flow} = + assert {:ok, _fetched_resource, %Flows.Flow{} = flow, expires_at} = authorize_flow(client, gateway, resource.id, subject) assert flow.policy_id == policy.id @@ -267,7 +268,7 @@ defmodule Domain.FlowsTest do assert flow.client_remote_ip.address == subject.context.remote_ip assert flow.client_user_agent == subject.context.user_agent assert flow.gateway_remote_ip == gateway.last_seen_remote_ip - assert flow.expires_at == subject.expires_at + assert expires_at == subject.expires_at end test "creates a network flow for service accounts", %{ @@ -285,7 +286,7 @@ defmodule Domain.FlowsTest do client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity) - assert {:ok, _fetched_resource, %Flows.Flow{} = flow} = + assert {:ok, _fetched_resource, %Flows.Flow{} = flow, expires_at} = authorize_flow(client, gateway, resource.id, subject) assert flow.policy_id == policy.id @@ -296,7 +297,7 @@ defmodule Domain.FlowsTest do assert flow.client_remote_ip.address == subject.context.remote_ip assert flow.client_user_agent == subject.context.user_agent assert flow.gateway_remote_ip == gateway.last_seen_remote_ip - assert flow.expires_at == subject.expires_at + assert expires_at == subject.expires_at end test "does not return authorized access to deleted resources", %{ @@ -375,7 +376,7 @@ defmodule Domain.FlowsTest do resource: resource, subject: subject } do - assert {:ok, resource, _flow} = + assert {:ok, resource, _flow, _expires_at} = authorize_flow(client, gateway, resource.id, subject, preload: :connections) assert Ecto.assoc_loaded?(resource.connections) @@ -929,27 +930,36 @@ defmodule Domain.FlowsTest do flow: flow, actor_group: actor_group } do - assert {:ok, [expired_flow]} = expire_flows_for(actor_group) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(actor_group) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for client identity", %{ flow: flow, identity: identity } do - assert {:ok, [expired_flow]} = expire_flows_for(identity) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(identity) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for client", %{ flow: flow, client: client } do - assert {:ok, [expired_flow]} = expire_flows_for(client) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(client) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end @@ -982,11 +992,13 @@ defmodule Domain.FlowsTest do actor: actor, policy: policy } do - assert {:ok, [expired_flow]} = - expire_flows_for(actor.account_id, actor.id, policy.actor_group_id) + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + assert :ok = expire_flows_for(actor.account_id, actor.id, policy.actor_group_id) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for actor", %{ @@ -994,18 +1006,24 @@ defmodule Domain.FlowsTest do actor: actor, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(actor, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(actor, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for policy", %{ flow: flow, policy: policy } do - assert {:ok, [expired_flow]} = expire_flows_for_policy_id(policy.account_id, policy.id) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for_policy_id(policy.account_id, policy.id) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for resource", %{ @@ -1013,9 +1031,12 @@ defmodule Domain.FlowsTest do resource: resource, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(resource, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(resource, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for policy actor group", %{ @@ -1023,9 +1044,12 @@ defmodule Domain.FlowsTest do actor_group: actor_group, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(actor_group, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(actor_group, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for client identity", %{ @@ -1033,9 +1057,12 @@ defmodule Domain.FlowsTest do identity: identity, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(identity, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(identity, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for client identity provider", %{ @@ -1043,20 +1070,12 @@ defmodule Domain.FlowsTest do provider: provider, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(provider, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id - end - - test "updates flow expiration expires_at", %{ - flow: flow, - actor: actor, - subject: subject - } do - assert {:ok, [_expired_flow]} = expire_flows_for(actor, subject) - - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(provider, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when subject has no permission to expire flows", %{ @@ -1077,16 +1096,16 @@ defmodule Domain.FlowsTest do actor_group: actor_group, subject: subject } do - assert {:ok, [_expired_flow]} = expire_flows_for(resource, subject) - assert {:ok, []} = expire_flows_for(actor_group, subject) - assert {:ok, []} = expire_flows_for(resource, subject) + assert :ok = expire_flows_for(resource, subject) + assert :ok = expire_flows_for(actor_group, subject) + assert :ok = expire_flows_for(resource, subject) end test "does not expire flows outside of account", %{ resource: resource } do subject = Fixtures.Auth.create_subject() - assert {:ok, []} = expire_flows_for(resource, subject) + assert :ok = expire_flows_for(resource, subject) end end end diff --git a/elixir/apps/domain/test/domain/resources_test.exs b/elixir/apps/domain/test/domain/resources_test.exs index eca7c2286..c36bd0a6e 100644 --- a/elixir/apps/domain/test/domain/resources_test.exs +++ b/elixir/apps/domain/test/domain/resources_test.exs @@ -1531,12 +1531,15 @@ defmodule Domain.ResourcesTest do subject: subject } do flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + + :ok = Domain.PubSub.Flow.subscribe(flow_id) attrs = %{"name" => "foo"} assert {:ok, _resource} = update_resource(resource, attrs, subject) - - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt + refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "allows to update connections", %{account: account, resource: resource, subject: subject} do @@ -1551,6 +1554,11 @@ defmodule Domain.ResourcesTest do gateway2 = Fixtures.Gateways.create_gateway(account: account) flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + + :ok = Domain.PubSub.Flow.subscribe(flow_id) attrs = %{ "connections" => [ @@ -1575,11 +1583,7 @@ defmodule Domain.ResourcesTest do "resource_id" => resource.id }) - # TODO: WAL - # Remove this after direct broadcast - Process.sleep(100) - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "does not allow to remove all connections", %{resource: resource, subject: subject} do diff --git a/elixir/apps/domain/test/domain/tokens_test.exs b/elixir/apps/domain/test/domain/tokens_test.exs index a07a41d72..77bba4cf3 100644 --- a/elixir/apps/domain/test/domain/tokens_test.exs +++ b/elixir/apps/domain/test/domain/tokens_test.exs @@ -511,16 +511,22 @@ defmodule Domain.TokensTest do identity: identity, subject: subject } do - Fixtures.Flows.create_flow( - account: account, - identity: identity, - subject: subject - ) + flow = + Fixtures.Flows.create_flow( + account: account, + identity: identity, + subject: subject + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) assert {:ok, _token} = delete_token_for(subject) - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "does not delete tokens for other actors", %{account: account, subject: subject} do diff --git a/elixir/apps/domain/test/support/fixtures/flows.ex b/elixir/apps/domain/test/support/fixtures/flows.ex index 982b13ed2..c463bc60d 100644 --- a/elixir/apps/domain/test/support/fixtures/flows.ex +++ b/elixir/apps/domain/test/support/fixtures/flows.ex @@ -82,8 +82,7 @@ defmodule Domain.Fixtures.Flows do account_id: account.id, client_remote_ip: client.last_seen_remote_ip, client_user_agent: client.last_seen_user_agent, - gateway_remote_ip: gateway.last_seen_remote_ip, - expires_at: subject.expires_at + gateway_remote_ip: gateway.last_seen_remote_ip }) |> Repo.insert!() end diff --git a/elixir/apps/web/lib/web/live/actors/show.ex b/elixir/apps/web/lib/web/live/actors/show.ex index 80c3f1a01..f6359adaa 100644 --- a/elixir/apps/web/lib/web/live/actors/show.ex +++ b/elixir/apps/web/lib/web/live/actors/show.ex @@ -562,11 +562,6 @@ defmodule Web.Actors.Show do
{flow.gateway_remote_ip} - <:col :let={flow} :if={@flow_activities_enabled?} label="activity" class="w-1/12"> - <.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}> - Show - - <:empty>
No activity to display.
diff --git a/elixir/apps/web/lib/web/live/clients/show.ex b/elixir/apps/web/lib/web/live/clients/show.ex index 2dffd85d2..7531df302 100644 --- a/elixir/apps/web/lib/web/live/clients/show.ex +++ b/elixir/apps/web/lib/web/live/clients/show.ex @@ -319,11 +319,6 @@ defmodule Web.Clients.Show do
{flow.gateway_remote_ip} - <:col :let={flow} :if={@flow_activities_enabled?} label="activity"> - <.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}> - Show - - <:empty>
No activity to display.
diff --git a/elixir/apps/web/lib/web/live/flows/download_activities.ex b/elixir/apps/web/lib/web/live/flows/download_activities.ex deleted file mode 100644 index fd0ed3485..000000000 --- a/elixir/apps/web/lib/web/live/flows/download_activities.ex +++ /dev/null @@ -1,75 +0,0 @@ -defmodule Web.Flows.DownloadActivities do - use Web, :controller - alias Domain.Flows - - def download(conn, %{"id" => id}) do - with {:ok, flow} <- Flows.fetch_flow_by_id(id, conn.assigns.subject) do - conn - |> put_resp_content_type("text/csv") - |> put_resp_header( - "content-disposition", - "attachment; filename=\"flow-activities-#{flow.id}.csv\"" - ) - |> put_root_layout(false) - |> send_chunked(200) - |> send_csv_header() - |> stream_csv_body(flow) - else - {:error, _reason} -> raise Web.LiveErrors.NotFoundError - end - end - - defp send_csv_header(conn) do - iodata = - Web.CSV.dump_to_iodata([~w[ - window_started_at window_ended_at - destination - connectivity_type - rx_bytes tx_bytes blocked_tx_bytes - ]]) - - {:ok, conn} = chunk(conn, iodata) - conn - end - - defp stream_csv_body(conn, flow, cursor \\ nil) do - with {:ok, activities, activities_metadata} <- - Flows.list_flow_activities_for( - flow, - conn.assigns.subject, - page: [cursor: cursor, limit: 100] - ), - {:ok, conn} <- stream_csv_rows(conn, activities) do - if activities_metadata.next_page_cursor do - stream_csv_body(conn, flow, activities_metadata.next_page_cursor) - else - conn - end - else - {:error, _reason} -> - conn - end - end - - defp stream_csv_rows(conn, activities) do - iodata = - activities - |> Enum.map(fn activity -> - [ - to_string(activity.window_started_at), - to_string(activity.window_ended_at), - to_string(activity.destination), - to_string(activity.connectivity_type), - activity.rx_bytes, - activity.tx_bytes, - activity.blocked_tx_bytes - ] - end) - |> Web.CSV.dump_to_iodata() - - case chunk(conn, iodata) do - {:ok, conn} -> {:ok, conn} - {:error, reason} -> {:error, reason} - end - end -end diff --git a/elixir/apps/web/lib/web/live/flows/show.ex b/elixir/apps/web/lib/web/live/flows/show.ex deleted file mode 100644 index 535854867..000000000 --- a/elixir/apps/web/lib/web/live/flows/show.ex +++ /dev/null @@ -1,190 +0,0 @@ -defmodule Web.Flows.Show do - use Web, :live_view - import Web.Policies.Components - alias Domain.{Accounts, Flows} - - def mount(%{"id" => id}, _session, socket) do - with true <- Accounts.flow_activities_enabled?(socket.assigns.account), - {:ok, flow} <- - Flows.fetch_flow_by_id(id, socket.assigns.subject, - preload: [ - policy: [:resource, :actor_group], - client: [], - gateway: [:group], - resource: [] - ] - ) do - last_used_connectivity_type = get_last_used_connectivity_type(flow, socket.assigns.subject) - - socket = - socket - |> assign( - page_title: "Flow #{flow.id}", - flow: flow, - last_used_connectivity_type: last_used_connectivity_type - ) - |> assign_live_table("activities", - query_module: Flows.Activity.Query, - sortable_fields: [], - callback: &handle_activities_update!/2 - ) - - {:ok, socket} - else - _other -> raise Web.LiveErrors.NotFoundError - end - end - - defp get_last_used_connectivity_type(flow, subject) do - case Flows.fetch_last_activity_for(flow, subject) do - {:ok, activity} -> to_string(activity.connectivity_type) - _other -> "N/A" - end - end - - def handle_params(params, uri, socket) do - socket = handle_live_tables_params(socket, params, uri) - {:noreply, socket} - end - - def handle_activities_update!(socket, list_opts) do - with {:ok, activities, metadata} <- - Flows.list_flow_activities_for(socket.assigns.flow, socket.assigns.subject, list_opts) do - {:ok, - assign(socket, - activities: activities, - activities_metadata: metadata - )} - end - end - - def render(assigns) do - ~H""" - <.breadcrumbs account={@account}> - <.breadcrumb>Flows - <.breadcrumb path={~p"/#{@account}/flows/#{@flow.id}"}> - {@flow.client.name} flow - - - - <.section> - <:title> - Flow for: {@flow.client.name} - - <:action> - <.button - navigate={~p"/#{@account}/flows/#{@flow}/activities.csv"} - icon="hero-arrow-down-on-square" - > - Export to CSV - - - <:content flash={@flash}> - <.vertical_table id="flow"> - <.vertical_table_row> - <:label>Authorized At - <:value> - <.relative_datetime datetime={@flow.inserted_at} /> - - - <.vertical_table_row> - <:label>Expires At - <:value> - <.relative_datetime datetime={@flow.expires_at} /> - - - <.vertical_table_row> - <:label>Policy - <:value> - <.link navigate={~p"/#{@account}/policies/#{@flow.policy_id}"} class={link_style()}> - <.policy_name policy={@flow.policy} /> - - - - <.vertical_table_row> - <:label>Client - <:value> - <.link navigate={~p"/#{@account}/clients/#{@flow.client_id}"} class={link_style()}> - {@flow.client.name} - -
Remote IP: {@flow.client_remote_ip}
-
User Agent: {@flow.client_user_agent}
- - - <.vertical_table_row> - <:label>Gateway - <:value> - <.link navigate={~p"/#{@account}/gateways/#{@flow.gateway_id}"} class={link_style()}> - {@flow.gateway.group.name}-{@flow.gateway.name} - -
- Remote IP: {@flow.gateway_remote_ip} -
- - - <.vertical_table_row> - <:label>Resource - <:value> - <.link navigate={~p"/#{@account}/resources/#{@flow.resource_id}"} class={link_style()}> - {@flow.resource.name} - - - - <.vertical_table_row> - <:label>Connectivity Type - <:value> - {@last_used_connectivity_type} - - - - - - - <.section> - <:title>Metrics - <:help> - Pre-aggregated metrics for this flow. - - <:content> - <.live_table - id="activities" - rows={@activities} - row_id={&"activities-#{&1.id}"} - filters={@filters_by_table_id["activities"]} - filter={@filter_form_by_table_id["activities"]} - ordered_by={@order_by_table_id["activities"]} - metadata={@activities_metadata} - > - <:col :let={activity} label="started"> - <.relative_datetime datetime={activity.window_started_at} /> - - <:col :let={activity} label="ended"> - <.relative_datetime datetime={activity.window_ended_at} /> - - <:col :let={activity} label="destination"> - {activity.destination} - - <:col :let={activity} label="connectivity type"> - {activity.connectivity_type} - - <:col :let={activity} label="rx"> - {Sizeable.filesize(activity.rx_bytes)} - - <:col :let={activity} label="tx"> - {Sizeable.filesize(activity.tx_bytes)} - - <:col :let={activity} label="blocked tx"> - {Sizeable.filesize(activity.blocked_tx_bytes)} - - <:empty> -
No metrics to display.
- - - - - """ - end - - def handle_event(event, params, socket) when event in ["paginate", "order_by", "filter"], - do: handle_live_table_event(event, params, socket) -end diff --git a/elixir/apps/web/lib/web/live/policies/show.ex b/elixir/apps/web/lib/web/live/policies/show.ex index adec338c5..44751af53 100644 --- a/elixir/apps/web/lib/web/live/policies/show.ex +++ b/elixir/apps/web/lib/web/live/policies/show.ex @@ -270,11 +270,6 @@ defmodule Web.Policies.Show do
{flow.gateway_remote_ip} - <:col :let={flow} :if={@flow_activities_enabled?} label="activity"> - <.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={link_style()}> - Show - - <:empty>
No activity to display.
diff --git a/elixir/apps/web/lib/web/live/resources/show.ex b/elixir/apps/web/lib/web/live/resources/show.ex index d4c206a51..faa9ecf14 100644 --- a/elixir/apps/web/lib/web/live/resources/show.ex +++ b/elixir/apps/web/lib/web/live/resources/show.ex @@ -366,11 +366,6 @@ defmodule Web.Resources.Show do
{flow.gateway_remote_ip} - <:col :let={flow} :if={@flow_activities_enabled?} label="activity"> - <.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}> - Show - - <:empty>
No activity to display.
diff --git a/elixir/apps/web/lib/web/router.ex b/elixir/apps/web/lib/web/router.ex index d94eea368..89ab70cda 100644 --- a/elixir/apps/web/lib/web/router.ex +++ b/elixir/apps/web/lib/web/router.ex @@ -207,11 +207,6 @@ defmodule Web.Router do live "/:id", Show end - scope "/flows", Flows do - live "/:id", Show - get "/:id/activities.csv", DownloadActivities, :download - end - scope "/settings", Settings do scope "/account" do live "/", Account diff --git a/elixir/apps/web/test/web/live/flows/show_test.exs b/elixir/apps/web/test/web/live/flows/show_test.exs deleted file mode 100644 index 830e03ac9..000000000 --- a/elixir/apps/web/test/web/live/flows/show_test.exs +++ /dev/null @@ -1,224 +0,0 @@ -defmodule Web.Live.Flows.ShowTest do - use Web.ConnCase, async: true - - setup do - account = Fixtures.Accounts.create_account() - actor = Fixtures.Actors.create_actor(type: :account_admin_user, account: account) - identity = Fixtures.Auth.create_identity(account: account, actor: actor) - subject = Fixtures.Auth.create_subject(account: account, actor: actor, identity: identity) - - client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity) - flow = Fixtures.Flows.create_flow(account: account, client: client) - - %{ - account: account, - actor: actor, - identity: identity, - subject: subject, - client: client, - flow: flow - } - end - - test "redirects to sign in page for unauthorized user", %{ - account: account, - flow: flow, - conn: conn - } do - path = ~p"/#{account}/flows/#{flow}" - - assert live(conn, path) == - {:error, - {:redirect, - %{ - to: ~p"/#{account}?#{%{redirect_to: path}}", - flash: %{"error" => "You must sign in to access this page."} - }}} - end - - test "renders 404 error when flow activities are not enabled", %{ - account: account, - identity: identity, - flow: flow, - conn: conn - } do - {:ok, account} = - Domain.Accounts.update_account(account, %{ - features: %{ - flow_activities: false - } - }) - - assert_raise Web.LiveErrors.NotFoundError, fn -> - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") =~ "404" - end - end - - test "renders breadcrumbs item", %{ - account: account, - flow: flow, - client: client, - identity: identity, - conn: conn - } do - {:ok, _lv, html} = - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") - - assert item = Floki.find(html, "[aria-label='Breadcrumb']") - breadcrumbs = String.trim(Floki.text(item)) - assert breadcrumbs =~ "Flows" - assert breadcrumbs =~ "#{client.name} flow" - end - - test "renders flows details", %{ - account: account, - identity: identity, - flow: flow, - conn: conn - } do - flow = - Repo.preload(flow, - policy: [:resource, :actor_group], - client: [], - gateway: [:group], - resource: [] - ) - - activity = - Fixtures.Flows.create_activity( - account: account, - flow: flow, - window_started_at: DateTime.truncate(flow.inserted_at, :second), - window_ended_at: DateTime.truncate(flow.expires_at, :second) - ) - - {:ok, lv, _html} = - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") - - table = - lv - |> element("#flow") - |> render() - |> vertical_table_to_map() - - assert table["authorized at"] - assert table["expires at"] - - assert table["connectivity type"] =~ to_string(activity.connectivity_type) - - assert table["client"] =~ flow.client.name - assert table["client"] =~ to_string(flow.client_remote_ip) - assert table["client"] =~ flow.client_user_agent - - assert table["gateway"] =~ flow.gateway.name - assert table["gateway"] =~ to_string(flow.gateway_remote_ip) - - assert table["resource"] =~ flow.resource.name - - assert table["policy"] =~ flow.policy.resource.name - assert table["policy"] =~ flow.policy.actor_group.name - end - - test "allows downloading activities", %{ - account: account, - flow: flow, - identity: identity, - conn: conn - } do - activity = - Fixtures.Flows.create_activity( - account: account, - flow: flow, - window_started_at: DateTime.truncate(flow.inserted_at, :second), - window_ended_at: DateTime.truncate(flow.expires_at, :second) - ) - - {:ok, lv, _html} = - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") - - lv - |> element("a", "Export to CSV") - |> render_click() - - assert_redirected(lv, ~p"/#{account}/flows/#{flow}/activities.csv") - - controller_conn = get(conn, ~p"/#{account}/flows/#{flow}/activities.csv") - assert redirected_to(controller_conn) =~ ~p"/#{account}" - assert flash(controller_conn, :error) == "You must sign in to access this page." - - controller_conn = - conn - |> authorize_conn(identity) - |> get(~p"/#{account}/flows/#{flow}/activities.csv") - - assert response = response(controller_conn, 200) - - assert response - |> String.trim() - |> String.split("\n") - |> Enum.map(&String.split(&1, "\t")) == - [ - [ - "window_started_at", - "window_ended_at", - "destination", - "connectivity_type", - "rx_bytes", - "tx_bytes", - "blocked_tx_bytes" - ], - [ - to_string(activity.window_started_at), - to_string(activity.window_ended_at), - to_string(activity.destination), - to_string(activity.connectivity_type), - to_string(activity.rx_bytes), - to_string(activity.tx_bytes), - to_string(activity.blocked_tx_bytes) - ] - ] - end - - test "renders activities table", %{ - account: account, - flow: flow, - identity: identity, - conn: conn - } do - activity = - Fixtures.Flows.create_activity( - account: account, - flow: flow, - window_started_at: DateTime.truncate(flow.inserted_at, :second), - window_ended_at: DateTime.truncate(flow.expires_at, :second), - tx_bytes: 1024 * 1024 * 1024 * 42 - ) - - {:ok, lv, _html} = - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") - - [row] = - lv - |> element("#activities") - |> render() - |> table_to_map() - - assert row["started"] - assert row["ended"] - - assert row["connectivity type"] == to_string(activity.connectivity_type) - assert row["destination"] == to_string(activity.destination) - assert row["rx"] == "#{activity.rx_bytes} B" - assert row["tx"] == "42 GB" - end -end diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 46abe73b0..84b133d8d 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -59,7 +59,6 @@ config :domain, Domain.ChangeLogs.ReplicationConnection, auth_providers clients flow_activities - flows gateway_groups gateways policies @@ -95,7 +94,6 @@ config :domain, Domain.Events.ReplicationConnection, auth_providers clients flow_activities - flows gateway_groups gateways policies