diff --git a/.github/workflows/_integration_tests.yml b/.github/workflows/_integration_tests.yml index 1c8aed2cc..479539184 100644 --- a/.github/workflows/_integration_tests.yml +++ b/.github/workflows/_integration_tests.yml @@ -120,7 +120,7 @@ jobs: with: project: firezone-staging - name: Seed database - run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed' + run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.migrate --migrations-path priv/repo/migrations --migrations-path priv/repo/manual_migrations && mix ecto.seed' - name: Start docker compose in the background run: | set -xe diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f706124c..fb1c5ad9c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -306,7 +306,7 @@ jobs: with: project: firezone-staging - name: Seed database - run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed' + run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed --migrations-path priv/repo/migrations --migrations-path priv/repo/manual_migrations' - name: Start docker compose in the background run: | # We need to increase the log level to make sure that they don't hold off storm of packets diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index d22b5641e..a607368a8 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -643,7 +643,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.set_attribute(:gateways_count, length(gateways)), gateway = Gateways.load_balance_gateways(location, gateways, connected_gateway_ids), OpenTelemetry.Tracer.set_attribute(:gateway_id, gateway.id), - {:ok, resource, flow} <- + {:ok, resource, flow, expires_at} <- Flows.authorize_flow( socket.assigns.client, gateway, @@ -665,7 +665,7 @@ defmodule API.Client.Channel do client_id: socket.assigns.client.id, resource_id: resource.id, flow_id: flow.id, - authorization_expires_at: flow.expires_at, + authorization_expires_at: expires_at, ice_credentials: ice_credentials, preshared_key: preshared_key }, {opentelemetry_ctx, opentelemetry_span_ctx}} @@ -789,7 +789,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.with_span "client.reuse_connection", attributes: attrs do with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), - {:ok, resource, flow} <- + {:ok, resource, flow, _expires_at} <- Flows.authorize_flow( socket.assigns.client, gateway, @@ -851,7 +851,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.with_span "client.request_connection", attributes: ctx_attrs do with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), - {:ok, resource, flow} <- + {:ok, resource, flow, _expires_at} <- Flows.authorize_flow( socket.assigns.client, gateway, diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 7f2209efc..c099907a4 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -224,7 +224,7 @@ defmodule API.Gateway.ChannelTest do assert_push "allow_access", %{} - {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + assert :ok = Domain.Flows.expire_flows_for(resource, subject) send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) @@ -647,7 +647,7 @@ defmodule API.Gateway.ChannelTest do assert_push "request_connection", %{} - {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + assert :ok = Domain.Flows.expire_flows_for(resource, subject) send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) @@ -878,7 +878,7 @@ defmodule API.Gateway.ChannelTest do assert_push "authorize_flow", %{} - {:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject) + assert :ok = Domain.Flows.expire_flows_for(resource, subject) send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id}) diff --git a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex index 7b62a623e..1d5a8852f 100644 --- a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex @@ -34,13 +34,6 @@ defmodule Domain.ChangeLogs.ReplicationConnection do :ok end - defp log(_op, _lsn, "flows", _old_data, _data) do - # TODO: WAL - # Flows are not logged to the change log as they are used only to trigger side effects which - # will be removed. Remove the flows table publication when that happens. - :ok - end - defp log(op, lsn, table, old_data, data) do attrs = %{ op: op, diff --git a/elixir/apps/domain/lib/domain/clients.ex b/elixir/apps/domain/lib/domain/clients.ex index f03b39190..f166d4620 100644 --- a/elixir/apps/domain/lib/domain/clients.ex +++ b/elixir/apps/domain/lib/domain/clients.ex @@ -212,8 +212,10 @@ defmodule Domain.Clients do preload: [:online?] ) |> case do + # TODO: WAL + # Broadcast flow side effects directly {:ok, client} -> - {:ok, _flows} = Flows.expire_flows_for(client) + :ok = Flows.expire_flows_for(client) {:ok, client} {:error, reason} -> diff --git a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex index 8085e6f08..80ff29d56 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex @@ -22,7 +22,10 @@ defmodule Domain.Events.Hooks.ActorGroupMemberships do Task.start(fn -> :ok = PubSub.Actor.Memberships.broadcast(actor_id, {:delete_membership, actor_id, group_id}) broadcast_access(:reject, actor_id, group_id) - {:ok, _flows} = Flows.expire_flows_for(account_id, actor_id, group_id) + + # TODO: WAL + # Broadcast flow side effects directly + :ok = Flows.expire_flows_for(account_id, actor_id, group_id) end) :ok diff --git a/elixir/apps/domain/lib/domain/events/hooks/flows.ex b/elixir/apps/domain/lib/domain/events/hooks/flows.ex deleted file mode 100644 index d0806ab03..000000000 --- a/elixir/apps/domain/lib/domain/events/hooks/flows.ex +++ /dev/null @@ -1,51 +0,0 @@ -# TODO: WAL -# Move side-effects from flows to state table in clients and gateways -defmodule Domain.Events.Hooks.Flows do - @behaviour Domain.Events.Hooks - alias Domain.PubSub - require Logger - - @impl true - def on_insert(_data), do: :ok - - @impl true - def on_update( - _old_data, - %{ - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id, - "expires_at" => expires_at - } = _data - ) do - if expired?(expires_at) do - # Flow has become expired - PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) - else - :ok - end - end - - # During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases - # where we might manually clear flows in a migration or some other mechanism. - @impl true - def on_delete( - %{ - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } = _old_data - ) do - PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) - end - - defp expired?(nil), do: false - - defp expired?(expires_at) do - with {:ok, expires_at, _} <- DateTime.from_iso8601(expires_at) do - DateTime.compare(DateTime.utc_now(), expires_at) == :gt - else - _ -> false - end - end -end diff --git a/elixir/apps/domain/lib/domain/events/hooks/policies.ex b/elixir/apps/domain/lib/domain/events/hooks/policies.ex index 86aeeb2c8..0a2cd23ed 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/policies.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/policies.ex @@ -69,7 +69,9 @@ defmodule Domain.Events.Hooks.Policies do payload = {:reject_access, policy_id, actor_group_id, resource_id} :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) - Flows.expire_flows_for_policy_id(account_id, policy_id) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Flows.expire_flows_for_policy_id(account_id, policy_id) end) :ok @@ -124,7 +126,9 @@ defmodule Domain.Events.Hooks.Policies do payload = {:allow_access, policy_id, actor_group_id, resource_id} :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) - Flows.expire_flows_for_policy_id(account_id, policy_id) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Flows.expire_flows_for_policy_id(account_id, policy_id) end) else Logger.warning("Breaking update ignored for policy as it is deleted or disabled", @@ -161,7 +165,9 @@ defmodule Domain.Events.Hooks.Policies do payload = {:reject_access, policy_id, actor_group_id, resource_id} :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) - Flows.expire_flows_for_policy_id(account_id, policy_id) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Flows.expire_flows_for_policy_id(account_id, policy_id) end) :ok diff --git a/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex b/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex index 0510bd16a..5c762fa09 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex @@ -11,11 +11,10 @@ defmodule Domain.Events.Hooks.ResourceConnections do @impl true def on_delete(%{"account_id" => account_id, "resource_id" => resource_id} = _old_data) do # TODO: WAL - # The flow expires_at field is not used for any persistence-related reason. - # Remove it and broadcast directly to subscribed pids to remove the flow - # from their local state. This hook is called when resources change sites. + # Broadcast flow side effects directly + # This hook is called when resources change sites. Task.start(fn -> - {:ok, _flows} = Flows.expire_flows_for_resource_id(account_id, resource_id) + :ok = Flows.expire_flows_for_resource_id(account_id, resource_id) end) :ok diff --git a/elixir/apps/domain/lib/domain/events/hooks/resources.ex b/elixir/apps/domain/lib/domain/events/hooks/resources.ex index e8d5b66b2..7ca4f678e 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/resources.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/resources.ex @@ -39,7 +39,7 @@ defmodule Domain.Events.Hooks.Resources do old_filters != filters or old_ip_stack != ip_stack do # TODO: WAL - # Directly broadcast to subscribed pids to remove the flow + # Broadcast flow side effects directly Task.start(fn -> payload = {:delete_resource, resource_id} PubSub.Resource.broadcast(resource_id, payload) @@ -49,7 +49,7 @@ defmodule Domain.Events.Hooks.Resources do PubSub.Resource.broadcast(resource_id, payload) PubSub.Account.Resources.broadcast(account_id, payload) - {:ok, _flows} = Flows.expire_flows_for_resource_id(account_id, resource_id) + :ok = Flows.expire_flows_for_resource_id(account_id, resource_id) end) :ok diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index e1401dd95..78731dcbc 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -19,7 +19,6 @@ defmodule Domain.Events.ReplicationConnection do "auth_providers" => Hooks.AuthProviders, "clients" => Hooks.Clients, "flow_activities" => Hooks.FlowActivities, - "flows" => Hooks.Flows, "gateway_groups" => Hooks.GatewayGroups, "gateways" => Hooks.Gateways, "policies" => Hooks.Policies, diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index a59aa99d5..c71a3ee32 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -43,12 +43,13 @@ defmodule Domain.Flows do account_id: account_id, client_remote_ip: client_remote_ip, client_user_agent: client_user_agent, - gateway_remote_ip: gateway_remote_ip, - expires_at: conformation_expires_at || expires_at + gateway_remote_ip: gateway_remote_ip }) |> Repo.insert!() - {:ok, resource, flow} + expires_at = conformation_expires_at || expires_at + + {:ok, resource, flow, expires_at} end end @@ -250,11 +251,32 @@ defmodule Domain.Flows do end defp expire_flows(queryable) do - {_count, flows} = - queryable - |> Flow.Query.expire() - |> Repo.update_all([]) + {:ok, :ok} = + Repo.transaction(fn -> + queryable + |> Repo.stream() + |> Stream.chunk_every(100) + |> Enum.each(fn chunk -> + Enum.each(chunk, &broadcast_flow_expiration/1) + end) + end) - {:ok, flows} + :ok + end + + defp broadcast_flow_expiration(flow) do + case Domain.PubSub.Flow.broadcast( + flow.id, + {:expire_flow, flow.id, flow.client_id, flow.resource_id} + ) do + :ok -> + :ok + + {:error, reason} -> + Logger.error("Failed to broadcast flow expiration", + reason: inspect(reason), + flow_id: flow.id + ) + end end end diff --git a/elixir/apps/domain/lib/domain/flows/authorizer.ex b/elixir/apps/domain/lib/domain/flows/authorizer.ex index e258dedf8..f28df64a3 100644 --- a/elixir/apps/domain/lib/domain/flows/authorizer.ex +++ b/elixir/apps/domain/lib/domain/flows/authorizer.ex @@ -1,3 +1,5 @@ +# TODO: Flows -> PolicyAuthorizations +# When this is renamed, "PolicyAuthorizations Authorizer" makes no sense, remove this module defmodule Domain.Flows.Authorizer do use Domain.Auth.Authorizer alias Domain.Flows.{Flow, Activity} diff --git a/elixir/apps/domain/lib/domain/flows/flow.ex b/elixir/apps/domain/lib/domain/flows/flow.ex index 93294d9b6..47d72108e 100644 --- a/elixir/apps/domain/lib/domain/flows/flow.ex +++ b/elixir/apps/domain/lib/domain/flows/flow.ex @@ -15,7 +15,6 @@ defmodule Domain.Flows.Flow do field :gateway_remote_ip, Domain.Types.IP - field :expires_at, :utc_datetime_usec timestamps(updated_at: false) end end diff --git a/elixir/apps/domain/lib/domain/flows/flow/changeset.ex b/elixir/apps/domain/lib/domain/flows/flow/changeset.ex index 8c27469ff..e2e1df0c6 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/changeset.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/changeset.ex @@ -5,14 +5,12 @@ defmodule Domain.Flows.Flow.Changeset do @fields ~w[token_id policy_id client_id gateway_id resource_id account_id client_remote_ip client_user_agent - gateway_remote_ip - expires_at]a - @required_fields @fields -- ~w[expires_at]a + gateway_remote_ip]a def create(attrs) do %Flow{} |> cast(attrs, @fields) - |> validate_required(@required_fields) + |> validate_required(@fields) |> assoc_constraint(:token) |> assoc_constraint(:policy) |> assoc_constraint(:client) diff --git a/elixir/apps/domain/lib/domain/flows/flow/query.ex b/elixir/apps/domain/lib/domain/flows/flow/query.ex index 9828ad6b3..be0f9dd8e 100644 --- a/elixir/apps/domain/lib/domain/flows/flow/query.ex +++ b/elixir/apps/domain/lib/domain/flows/flow/query.ex @@ -5,10 +5,6 @@ defmodule Domain.Flows.Flow.Query do from(flows in Domain.Flows.Flow, as: :flows) end - def not_expired(queryable \\ all()) do - where(queryable, [flows: flows], flows.expires_at > fragment("timezone('UTC', NOW())")) - end - def by_id(queryable, id) do where(queryable, [flows: flows], flows.id == ^id) end @@ -61,17 +57,6 @@ defmodule Domain.Flows.Flow.Query do where(queryable, [flows: flows], flows.gateway_id == ^gateway_id) end - def expire(queryable) do - queryable - |> not_expired() - |> Ecto.Query.select([flows: flows], flows) - |> Ecto.Query.update([flows: flows], - set: [ - expires_at: fragment("LEAST(?, timezone('UTC', NOW()))", flows.expires_at) - ] - ) - end - def with_joined_policy(queryable) do with_named_binding(queryable, :policy, fn queryable, binding -> join(queryable, :inner, [flows: flows], policy in assoc(flows, ^binding), as: ^binding) @@ -100,27 +85,4 @@ defmodule Domain.Flows.Flow.Query do {:flows, :desc, :inserted_at}, {:flows, :asc, :id} ] - - @impl Domain.Repo.Query - def filters, - do: [ - %Domain.Repo.Filter{ - name: :expiration, - title: "Expired", - type: :string, - values: [ - {"Expired", "expired"}, - {"Not Expired", "not_expired"} - ], - fun: &filter_by_expired/2 - } - ] - - def filter_by_expired(queryable, "expired") do - {queryable, dynamic([flows: flows], flows.expires_at < fragment("timezone('UTC', NOW())"))} - end - - def filter_by_expired(queryable, "not_expired") do - {queryable, dynamic([flows: flows], flows.expires_at >= fragment("timezone('UTC', NOW())"))} - end end diff --git a/elixir/apps/domain/lib/domain/tokens.ex b/elixir/apps/domain/lib/domain/tokens.ex index 5c6bd14db..b5b3892ec 100644 --- a/elixir/apps/domain/lib/domain/tokens.ex +++ b/elixir/apps/domain/lib/domain/tokens.ex @@ -183,7 +183,9 @@ defmodule Domain.Tokens do ]} with :ok <- Auth.ensure_has_permissions(subject, required_permissions) do - {:ok, _flows} = Domain.Flows.expire_flows_for(token, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(token, subject) Token.Query.not_deleted() |> Token.Query.by_id(token.id) @@ -203,7 +205,9 @@ defmodule Domain.Tokens do |> delete_tokens() |> case do {:ok, [token]} -> - {:ok, _flows} = Domain.Flows.expire_flows_for(token, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(token, subject) {:ok, token} {:ok, []} -> @@ -212,7 +216,9 @@ defmodule Domain.Tokens do end def delete_tokens_for(%Auth.Identity{} = identity) do - {:ok, _flows} = Domain.Flows.expire_flows_for(identity) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(identity) Token.Query.not_deleted() |> Token.Query.by_identity_id(identity.id) @@ -221,7 +227,9 @@ defmodule Domain.Tokens do def delete_tokens_for(%Actors.Actor{} = actor, %Auth.Subject{} = subject) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do - {:ok, _flows} = Domain.Flows.expire_flows_for(actor, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(actor, subject) Token.Query.not_deleted() |> Token.Query.by_actor_id(actor.id) @@ -232,7 +240,9 @@ defmodule Domain.Tokens do def delete_tokens_for(%Auth.Identity{} = identity, %Auth.Subject{} = subject) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do - {:ok, _flows} = Domain.Flows.expire_flows_for(identity, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(identity, subject) Token.Query.not_deleted() |> Token.Query.by_identity_id(identity.id) @@ -243,7 +253,9 @@ defmodule Domain.Tokens do def delete_tokens_for(%Auth.Provider{} = provider, %Auth.Subject{} = subject) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do - {:ok, _flows} = Domain.Flows.expire_flows_for(provider, subject) + # TODO: WAL + # Broadcast flow side effects directly + :ok = Domain.Flows.expire_flows_for(provider, subject) Token.Query.not_deleted() |> Token.Query.by_provider_id(provider.id) diff --git a/elixir/apps/domain/priv/repo/manual_migrations/20250626175732_drop_expires_at_from_flows.exs b/elixir/apps/domain/priv/repo/manual_migrations/20250626175732_drop_expires_at_from_flows.exs new file mode 100644 index 000000000..afbc9fd12 --- /dev/null +++ b/elixir/apps/domain/priv/repo/manual_migrations/20250626175732_drop_expires_at_from_flows.exs @@ -0,0 +1,15 @@ +defmodule Domain.Repo.Migrations.DropExpiresAtFromFlows do + use Ecto.Migration + + def up do + alter table(:flows) do + remove(:expires_at) + end + end + + def down do + alter table(:flows) do + add(:expires_at, :utc_datetime_usec, null: false) + end + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250626175731_change_column_null_flows_expires_at.exs b/elixir/apps/domain/priv/repo/migrations/20250626175731_change_column_null_flows_expires_at.exs new file mode 100644 index 000000000..cc0d28997 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250626175731_change_column_null_flows_expires_at.exs @@ -0,0 +1,15 @@ +defmodule Domain.Repo.Migrations.ChangeColumnNullFlowsExpiresAt do + use Ecto.Migration + + def up do + alter table(:flows) do + modify(:expires_at, :utc_datetime_usec, null: true) + end + end + + def down do + alter table(:flows) do + modify(:expires_at, :utc_datetime_usec, null: false) + end + end +end diff --git a/elixir/apps/domain/priv/repo/seeds.exs b/elixir/apps/domain/priv/repo/seeds.exs index b83c2f323..19d25f0c9 100644 --- a/elixir/apps/domain/priv/repo/seeds.exs +++ b/elixir/apps/domain/priv/repo/seeds.exs @@ -1176,7 +1176,7 @@ defmodule Domain.Repo.Seeds do IO.puts("") - {:ok, _resource, flow} = + {:ok, _resource, flow, _expires_at} = Flows.authorize_flow( user_iphone, gateway1, diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index 4b0da99dc..5da0422eb 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -1248,12 +1248,12 @@ defmodule Domain.ActorsTest do "group_id" => group2.id }) - # TODO: WAL - # Remove this when direct broadcast is implemented - Process.sleep(100) + :ok = Domain.PubSub.Flow.subscribe(flow.id) - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "deletes memberships of removed groups", %{ @@ -3043,16 +3043,21 @@ defmodule Domain.ActorsTest do subject = Fixtures.Auth.create_subject(identity: identity) client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) assert {:ok, _actor} = disable_actor(actor, subject) - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when trying to disable the last admin actor" do @@ -3287,16 +3292,21 @@ defmodule Domain.ActorsTest do } do client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) assert {:ok, _actor} = delete_actor(actor, subject) - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when trying to delete the last admin actor", %{ diff --git a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs index e4f7b2804..b376387e7 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs @@ -629,6 +629,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) Fixtures.Actors.create_membership(account: account, actor: actor, group: org_unit) + :ok = PubSub.Flow.subscribe(deleted_group_flow.id) + :ok = PubSub.Flow.subscribe(deleted_identity_flow.id) :ok = PubSub.Actor.Memberships.subscribe(actor.id) :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) @@ -752,12 +754,16 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do Process.sleep(100) # Deleted policies expire all flows authorized by them - deleted_group_flow = Repo.reload(deleted_group_flow) - assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_group_flow.id + client_id = deleted_group_flow.client_id + resource_id = deleted_group_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Expires flows for signed out user - deleted_identity_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Should not do anything else refute_received {:allow_access, _policy_id, _group_id, _resource_id} diff --git a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs index 0823ecd86..df00b99d1 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs @@ -416,6 +416,8 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) + :ok = PubSub.Flow.subscribe(deleted_identity_flow.id) + :ok = PubSub.Flow.subscribe(deleted_group_flow.id) :ok = PubSub.Actor.Memberships.subscribe(actor.id) :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) @@ -516,12 +518,16 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do Process.sleep(100) # Deleted policies expire all flows authorized by them - deleted_group_flow = Repo.reload(deleted_group_flow) - assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_group_flow.id + client_id = deleted_group_flow.client_id + resource_id = deleted_group_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Expires flows for signed out user - deleted_identity_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Should not do anything else refute_received {:allow_access, _policy_id, _group_id, _resource_id} diff --git a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs index 7e7b1c8cb..7447fe8a9 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs @@ -464,6 +464,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) + :ok = PubSub.Flow.subscribe(deleted_identity_flow.id) + :ok = PubSub.Flow.subscribe(deleted_group_flow.id) :ok = PubSub.Actor.Memberships.subscribe(actor.id) :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) @@ -583,12 +585,16 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do Process.sleep(100) # Deleted policies expire all flows authorized by them - deleted_group_flow = Repo.reload(deleted_group_flow) - assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_group_flow.id + client_id = deleted_group_flow.client_id + resource_id = deleted_group_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Expires flows for signed out user - deleted_identity_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Should not do anything else refute_received {:allow_access, _policy_id, _group_id, _resource_id} diff --git a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs index 302ab9991..6971cdb23 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs @@ -708,6 +708,8 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) + :ok = PubSub.Flow.subscribe(deleted_identity_flow.id) + :ok = PubSub.Flow.subscribe(deleted_group_flow.id) :ok = PubSub.Actor.Memberships.subscribe(actor.id) :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) @@ -818,12 +820,16 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} # Deleted policies expire all flows authorized by them - deleted_group_flow = Repo.reload(deleted_group_flow) - assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_group_flow.id + client_id = deleted_group_flow.client_id + resource_id = deleted_group_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Expires flows for signed out user - deleted_identity_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} # Should not do anything else refute_received {:allow_access, _policy_id, _group_id, _resource_id} diff --git a/elixir/apps/domain/test/domain/auth_test.exs b/elixir/apps/domain/test/domain/auth_test.exs index efa56ff72..a7193da75 100644 --- a/elixir/apps/domain/test/domain/auth_test.exs +++ b/elixir/apps/domain/test/domain/auth_test.exs @@ -1062,16 +1062,21 @@ defmodule Domain.AuthTest do } do client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) + + flow_id = flow.id + client_id = client.id + resource_id = flow.resource_id assert {:ok, _provider} = disable_provider(provider, subject) - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when trying to disable the last provider", %{ @@ -1282,16 +1287,21 @@ defmodule Domain.AuthTest do } do client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) + + flow_id = flow.id + client_id = client.id + resource_id = flow.resource_id assert {:ok, _provider} = delete_provider(provider, subject) - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when trying to delete the last provider", %{ @@ -1882,6 +1892,8 @@ defmodule Domain.AuthTest do token_id: deleted_identity_token.id ) + :ok = Domain.PubSub.Flow.subscribe(deleted_identity_flow.id) + for n <- 1..4 do Fixtures.Auth.create_identity( account: account, @@ -1948,8 +1960,10 @@ defmodule Domain.AuthTest do assert deleted_identity_token.deleted_at # Expires flows for signed out user - reloaded_flow = Repo.reload(deleted_identity_flow) - assert DateTime.compare(reloaded_flow.expires_at, DateTime.utc_now()) == :lt + flow_id = deleted_identity_flow.id + client_id = deleted_identity_flow.client_id + resource_id = deleted_identity_flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "circuit breaker prevents mass deletions of identities", %{ @@ -2783,18 +2797,23 @@ defmodule Domain.AuthTest do } do client = Fixtures.Clients.create_client(account: account, identity: identity) - Fixtures.Flows.create_flow( - account: account, - identity: identity, - actor: actor, - subject: subject, - client: client - ) + flow = + Fixtures.Flows.create_flow( + account: account, + identity: identity, + actor: actor, + subject: subject, + client: client + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) + + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id assert delete_identities_for(actor, subject) == :ok - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "does not remove identities that belong to another actor", %{ diff --git a/elixir/apps/domain/test/domain/clients_test.exs b/elixir/apps/domain/test/domain/clients_test.exs index abd8baa8f..be184c264 100644 --- a/elixir/apps/domain/test/domain/clients_test.exs +++ b/elixir/apps/domain/test/domain/clients_test.exs @@ -1092,11 +1092,15 @@ defmodule Domain.ClientsTest do subject: subject ) + :ok = Domain.PubSub.Flow.subscribe(flow.id) + assert {:ok, client} = verify_client(client, subject) assert {:ok, _client} = remove_client_verification(client, subject) - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + flow_id = flow.id + client_id = client.id + resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when subject has no permission to verify clients", %{ diff --git a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs deleted file mode 100644 index ca99f0262..000000000 --- a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs +++ /dev/null @@ -1,93 +0,0 @@ -defmodule Domain.Events.Hooks.FlowsTest do - use Domain.DataCase, async: true - import Domain.Events.Hooks.Flows - - setup do - %{old_data: %{}, data: %{}} - end - - describe "insert/1" do - test "returns :ok", %{data: data} do - assert :ok == on_insert(data) - end - end - - describe "update/2" do - test "broadcasts expire_flow if flow is expired" do - flow_id = "flow_123" - client_id = "client_123" - resource_id = "resource_123" - - old_data = %{} - - data = %{ - "expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(), - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } - - :ok = Domain.PubSub.Flow.subscribe(flow_id) - - assert :ok == on_update(old_data, data) - assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} - end - - test "does not broadcast expire_flow if flow is not expired" do - flow_id = "flow_123" - client_id = "client_123" - resource_id = "resource_123" - - old_data = %{} - - data = %{ - "expires_at" => DateTime.utc_now() |> DateTime.add(1, :second) |> DateTime.to_iso8601(), - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } - - :ok = Domain.PubSub.Flow.subscribe(flow_id) - - assert :ok == on_update(old_data, data) - refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} - end - - test "does not receive broadcast when not subscribed" do - flow_id = "flow_123" - client_id = "client_123" - resource_id = "resource_123" - - old_data = %{} - - data = %{ - "expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(), - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } - - assert :ok == on_update(old_data, data) - refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} - end - end - - describe "delete/1" do - test "broadcasts expire_flow" do - flow_id = "flow_123" - client_id = "client_123" - resource_id = "resource_123" - - old_data = %{ - "id" => flow_id, - "client_id" => client_id, - "resource_id" => resource_id - } - - :ok = Domain.PubSub.Flow.subscribe(flow_id) - - assert :ok == on_delete(old_data) - assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} - end - end -end diff --git a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs index 9cfa6c3f1..2654571b1 100644 --- a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs @@ -57,6 +57,8 @@ defmodule Domain.Events.Hooks.PoliciesTest do test "disable: broadcasts :disable_policy and :reject_access" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -72,6 +74,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "disabled_at", "2023-10-01T00:00:00Z") + :ok = PubSub.Flow.subscribe(flow_id) :ok = PubSub.Policy.subscribe(policy_id) :ok = PubSub.Account.Policies.subscribe(account_id) :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) @@ -85,13 +88,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "soft-delete: broadcasts :delete_policy and :reject_access" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -107,6 +110,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z") + :ok = PubSub.Flow.subscribe(flow_id) :ok = PubSub.Policy.subscribe(policy_id) :ok = PubSub.Account.Policies.subscribe(account_id) :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) @@ -120,13 +124,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "breaking update: broadcasts :delete_policy, :reject_access, :create_policy, :allow_access" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -142,12 +146,17 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "resource_id", "new-resource-123") + :ok = PubSub.Flow.subscribe(flow_id) :ok = PubSub.Policy.subscribe(policy_id) :ok = PubSub.Account.Policies.subscribe(account_id) :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) + # TODO: WAL + # Remove this when side effects are directly broadcasted + Process.sleep(100) + assert_receive {:delete_policy, ^policy_id} assert_receive {:delete_policy, ^policy_id} assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id} @@ -160,13 +169,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "breaking update: disabled policy has no side-effects" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -197,9 +206,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt + refute_receive {:expire_flow, ^flow_id, ^client_id, "new-resource-123"} end test "non-breaking-update: broadcasts :update_policy" do @@ -233,6 +240,8 @@ defmodule Domain.Events.Hooks.PoliciesTest do describe "delete/1" do test "broadcasts :delete_policy and :reject_access" do flow = Fixtures.Flows.create_flow() + flow_id = flow.id + client_id = flow.client_id policy_id = flow.policy_id account_id = flow.account_id actor_group_id = "group-456" @@ -248,6 +257,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z") + :ok = PubSub.Flow.subscribe(flow_id) :ok = PubSub.Policy.subscribe(policy_id) :ok = PubSub.Account.Policies.subscribe(account_id) :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) @@ -261,9 +271,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end end diff --git a/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs b/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs index 88e90e780..deb79ac32 100644 --- a/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs @@ -21,19 +21,16 @@ defmodule Domain.Events.Hooks.ResourceConnectionsTest do describe "delete/1" do test "returns :ok" do flow = Fixtures.Flows.create_flow() + :ok = Domain.PubSub.Flow.subscribe(flow.id) - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :lt + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id assert :ok == on_delete(%{"account_id" => flow.account_id, "resource_id" => flow.resource_id}) - # TODO: WAL - # Remove this when flow removal is directly broadcasted - Process.sleep(100) - - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end end diff --git a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs index 593cd3aed..26eb93575 100644 --- a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs @@ -68,6 +68,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource type changes", %{flow: flow, old_data: old_data} do + :ok = PubSub.Flow.subscribe(flow.id) :ok = PubSub.Resource.subscribe(flow.resource_id) :ok = PubSub.Account.Resources.subscribe(flow.account_id) @@ -78,12 +79,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do # TODO: WAL # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} @@ -91,6 +92,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource address changes", %{flow: flow, old_data: old_data} do + :ok = PubSub.Flow.subscribe(flow.id) :ok = PubSub.Resource.subscribe(flow.resource_id) :ok = PubSub.Account.Resources.subscribe(flow.account_id) @@ -101,12 +103,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do # TODO: WAL # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} @@ -114,6 +116,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource filters change", %{flow: flow, old_data: old_data} do + :ok = PubSub.Flow.subscribe(flow.id) :ok = PubSub.Resource.subscribe(flow.resource_id) :ok = PubSub.Account.Resources.subscribe(flow.account_id) @@ -124,12 +127,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do # TODO: WAL # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} @@ -137,6 +140,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource ip_stack changes", %{flow: flow, old_data: old_data} do + :ok = PubSub.Flow.subscribe(flow.id) :ok = PubSub.Resource.subscribe(flow.resource_id) :ok = PubSub.Account.Resources.subscribe(flow.account_id) @@ -147,12 +151,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do # TODO: WAL # Remove this after direct broadcast Process.sleep(100) - flow = Repo.reload(flow) - - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} @@ -167,10 +171,15 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert :ok == on_update(old_data, data) - assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :lt + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + flow_id = flow.id + client_id = flow.client_id resource_id = flow.resource_id + refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} assert_receive {:update_resource, ^resource_id} assert_receive {:update_resource, ^resource_id} refute_receive {:delete_resource, ^resource_id} diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index 9a4adc43f..928f87c76 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -76,7 +76,7 @@ defmodule Domain.FlowsTest do policy: policy, subject: subject } do - assert {:ok, fetched_resource, _flow} = + assert {:ok, fetched_resource, _flow, _expires_at} = authorize_flow(client, gateway, resource.id, subject) assert fetched_resource.id == resource.id @@ -198,11 +198,11 @@ defmodule Domain.FlowsTest do ] ) - assert {:ok, _fetched_resource, flow} = + assert {:ok, _fetched_resource, flow, expires_at} = authorize_flow(client, gateway, resource.id, subject) assert flow.policy_id == policy.id - assert DateTime.diff(flow.expires_at, DateTime.new!(date, midnight)) < 5 + assert DateTime.diff(expires_at, DateTime.new!(date, midnight)) < 5 end test "creates a flow when all conditions for at least one of the policies are satisfied", %{ @@ -237,10 +237,11 @@ defmodule Domain.FlowsTest do ] ) - assert {:ok, _fetched_resource, flow} = + assert {:ok, _fetched_resource, flow, expires_at} = authorize_flow(client, gateway, resource.id, subject) - assert flow.expires_at == subject.expires_at + assert flow.resource_id == resource.id + assert expires_at == subject.expires_at end test "creates a network flow for users", %{ @@ -256,7 +257,7 @@ defmodule Domain.FlowsTest do Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group) - assert {:ok, _fetched_resource, %Flows.Flow{} = flow} = + assert {:ok, _fetched_resource, %Flows.Flow{} = flow, expires_at} = authorize_flow(client, gateway, resource.id, subject) assert flow.policy_id == policy.id @@ -267,7 +268,7 @@ defmodule Domain.FlowsTest do assert flow.client_remote_ip.address == subject.context.remote_ip assert flow.client_user_agent == subject.context.user_agent assert flow.gateway_remote_ip == gateway.last_seen_remote_ip - assert flow.expires_at == subject.expires_at + assert expires_at == subject.expires_at end test "creates a network flow for service accounts", %{ @@ -285,7 +286,7 @@ defmodule Domain.FlowsTest do client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity) - assert {:ok, _fetched_resource, %Flows.Flow{} = flow} = + assert {:ok, _fetched_resource, %Flows.Flow{} = flow, expires_at} = authorize_flow(client, gateway, resource.id, subject) assert flow.policy_id == policy.id @@ -296,7 +297,7 @@ defmodule Domain.FlowsTest do assert flow.client_remote_ip.address == subject.context.remote_ip assert flow.client_user_agent == subject.context.user_agent assert flow.gateway_remote_ip == gateway.last_seen_remote_ip - assert flow.expires_at == subject.expires_at + assert expires_at == subject.expires_at end test "does not return authorized access to deleted resources", %{ @@ -375,7 +376,7 @@ defmodule Domain.FlowsTest do resource: resource, subject: subject } do - assert {:ok, resource, _flow} = + assert {:ok, resource, _flow, _expires_at} = authorize_flow(client, gateway, resource.id, subject, preload: :connections) assert Ecto.assoc_loaded?(resource.connections) @@ -929,27 +930,36 @@ defmodule Domain.FlowsTest do flow: flow, actor_group: actor_group } do - assert {:ok, [expired_flow]} = expire_flows_for(actor_group) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(actor_group) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for client identity", %{ flow: flow, identity: identity } do - assert {:ok, [expired_flow]} = expire_flows_for(identity) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(identity) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for client", %{ flow: flow, client: client } do - assert {:ok, [expired_flow]} = expire_flows_for(client) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(client) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end end @@ -982,11 +992,13 @@ defmodule Domain.FlowsTest do actor: actor, policy: policy } do - assert {:ok, [expired_flow]} = - expire_flows_for(actor.account_id, actor.id, policy.actor_group_id) + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + assert :ok = expire_flows_for(actor.account_id, actor.id, policy.actor_group_id) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for actor", %{ @@ -994,18 +1006,24 @@ defmodule Domain.FlowsTest do actor: actor, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(actor, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(actor, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for policy", %{ flow: flow, policy: policy } do - assert {:ok, [expired_flow]} = expire_flows_for_policy_id(policy.account_id, policy.id) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for_policy_id(policy.account_id, policy.id) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for resource", %{ @@ -1013,9 +1031,12 @@ defmodule Domain.FlowsTest do resource: resource, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(resource, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(resource, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for policy actor group", %{ @@ -1023,9 +1044,12 @@ defmodule Domain.FlowsTest do actor_group: actor_group, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(actor_group, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(actor_group, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for client identity", %{ @@ -1033,9 +1057,12 @@ defmodule Domain.FlowsTest do identity: identity, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(identity, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(identity, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "expires flows for client identity provider", %{ @@ -1043,20 +1070,12 @@ defmodule Domain.FlowsTest do provider: provider, subject: subject } do - assert {:ok, [expired_flow]} = expire_flows_for(provider, subject) - assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 - assert expired_flow.id == flow.id - end - - test "updates flow expiration expires_at", %{ - flow: flow, - actor: actor, - subject: subject - } do - assert {:ok, [_expired_flow]} = expire_flows_for(actor, subject) - - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + :ok = Domain.PubSub.Flow.subscribe(flow.id) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + assert :ok = expire_flows_for(provider, subject) + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "returns error when subject has no permission to expire flows", %{ @@ -1077,16 +1096,16 @@ defmodule Domain.FlowsTest do actor_group: actor_group, subject: subject } do - assert {:ok, [_expired_flow]} = expire_flows_for(resource, subject) - assert {:ok, []} = expire_flows_for(actor_group, subject) - assert {:ok, []} = expire_flows_for(resource, subject) + assert :ok = expire_flows_for(resource, subject) + assert :ok = expire_flows_for(actor_group, subject) + assert :ok = expire_flows_for(resource, subject) end test "does not expire flows outside of account", %{ resource: resource } do subject = Fixtures.Auth.create_subject() - assert {:ok, []} = expire_flows_for(resource, subject) + assert :ok = expire_flows_for(resource, subject) end end end diff --git a/elixir/apps/domain/test/domain/resources_test.exs b/elixir/apps/domain/test/domain/resources_test.exs index eca7c2286..c36bd0a6e 100644 --- a/elixir/apps/domain/test/domain/resources_test.exs +++ b/elixir/apps/domain/test/domain/resources_test.exs @@ -1531,12 +1531,15 @@ defmodule Domain.ResourcesTest do subject: subject } do flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + + :ok = Domain.PubSub.Flow.subscribe(flow_id) attrs = %{"name" => "foo"} assert {:ok, _resource} = update_resource(resource, attrs, subject) - - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt + refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "allows to update connections", %{account: account, resource: resource, subject: subject} do @@ -1551,6 +1554,11 @@ defmodule Domain.ResourcesTest do gateway2 = Fixtures.Gateways.create_gateway(account: account) flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject) + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + + :ok = Domain.PubSub.Flow.subscribe(flow_id) attrs = %{ "connections" => [ @@ -1575,11 +1583,7 @@ defmodule Domain.ResourcesTest do "resource_id" => resource.id }) - # TODO: WAL - # Remove this after direct broadcast - Process.sleep(100) - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "does not allow to remove all connections", %{resource: resource, subject: subject} do diff --git a/elixir/apps/domain/test/domain/tokens_test.exs b/elixir/apps/domain/test/domain/tokens_test.exs index a07a41d72..77bba4cf3 100644 --- a/elixir/apps/domain/test/domain/tokens_test.exs +++ b/elixir/apps/domain/test/domain/tokens_test.exs @@ -511,16 +511,22 @@ defmodule Domain.TokensTest do identity: identity, subject: subject } do - Fixtures.Flows.create_flow( - account: account, - identity: identity, - subject: subject - ) + flow = + Fixtures.Flows.create_flow( + account: account, + identity: identity, + subject: subject + ) + + :ok = Domain.PubSub.Flow.subscribe(flow.id) assert {:ok, _token} = delete_token_for(subject) - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 + flow_id = flow.id + client_id = flow.client_id + resource_id = flow.resource_id + + assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} end test "does not delete tokens for other actors", %{account: account, subject: subject} do diff --git a/elixir/apps/domain/test/support/fixtures/flows.ex b/elixir/apps/domain/test/support/fixtures/flows.ex index 982b13ed2..c463bc60d 100644 --- a/elixir/apps/domain/test/support/fixtures/flows.ex +++ b/elixir/apps/domain/test/support/fixtures/flows.ex @@ -82,8 +82,7 @@ defmodule Domain.Fixtures.Flows do account_id: account.id, client_remote_ip: client.last_seen_remote_ip, client_user_agent: client.last_seen_user_agent, - gateway_remote_ip: gateway.last_seen_remote_ip, - expires_at: subject.expires_at + gateway_remote_ip: gateway.last_seen_remote_ip }) |> Repo.insert!() end diff --git a/elixir/apps/web/lib/web/live/actors/show.ex b/elixir/apps/web/lib/web/live/actors/show.ex index 80c3f1a01..f6359adaa 100644 --- a/elixir/apps/web/lib/web/live/actors/show.ex +++ b/elixir/apps/web/lib/web/live/actors/show.ex @@ -562,11 +562,6 @@ defmodule Web.Actors.Show do
{flow.gateway_remote_ip} - <:col :let={flow} :if={@flow_activities_enabled?} label="activity" class="w-1/12"> - <.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}> - Show - - <:empty>
No activity to display.
diff --git a/elixir/apps/web/lib/web/live/clients/show.ex b/elixir/apps/web/lib/web/live/clients/show.ex index 2dffd85d2..7531df302 100644 --- a/elixir/apps/web/lib/web/live/clients/show.ex +++ b/elixir/apps/web/lib/web/live/clients/show.ex @@ -319,11 +319,6 @@ defmodule Web.Clients.Show do
{flow.gateway_remote_ip} - <:col :let={flow} :if={@flow_activities_enabled?} label="activity"> - <.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}> - Show - - <:empty>
No activity to display.
diff --git a/elixir/apps/web/lib/web/live/flows/download_activities.ex b/elixir/apps/web/lib/web/live/flows/download_activities.ex deleted file mode 100644 index fd0ed3485..000000000 --- a/elixir/apps/web/lib/web/live/flows/download_activities.ex +++ /dev/null @@ -1,75 +0,0 @@ -defmodule Web.Flows.DownloadActivities do - use Web, :controller - alias Domain.Flows - - def download(conn, %{"id" => id}) do - with {:ok, flow} <- Flows.fetch_flow_by_id(id, conn.assigns.subject) do - conn - |> put_resp_content_type("text/csv") - |> put_resp_header( - "content-disposition", - "attachment; filename=\"flow-activities-#{flow.id}.csv\"" - ) - |> put_root_layout(false) - |> send_chunked(200) - |> send_csv_header() - |> stream_csv_body(flow) - else - {:error, _reason} -> raise Web.LiveErrors.NotFoundError - end - end - - defp send_csv_header(conn) do - iodata = - Web.CSV.dump_to_iodata([~w[ - window_started_at window_ended_at - destination - connectivity_type - rx_bytes tx_bytes blocked_tx_bytes - ]]) - - {:ok, conn} = chunk(conn, iodata) - conn - end - - defp stream_csv_body(conn, flow, cursor \\ nil) do - with {:ok, activities, activities_metadata} <- - Flows.list_flow_activities_for( - flow, - conn.assigns.subject, - page: [cursor: cursor, limit: 100] - ), - {:ok, conn} <- stream_csv_rows(conn, activities) do - if activities_metadata.next_page_cursor do - stream_csv_body(conn, flow, activities_metadata.next_page_cursor) - else - conn - end - else - {:error, _reason} -> - conn - end - end - - defp stream_csv_rows(conn, activities) do - iodata = - activities - |> Enum.map(fn activity -> - [ - to_string(activity.window_started_at), - to_string(activity.window_ended_at), - to_string(activity.destination), - to_string(activity.connectivity_type), - activity.rx_bytes, - activity.tx_bytes, - activity.blocked_tx_bytes - ] - end) - |> Web.CSV.dump_to_iodata() - - case chunk(conn, iodata) do - {:ok, conn} -> {:ok, conn} - {:error, reason} -> {:error, reason} - end - end -end diff --git a/elixir/apps/web/lib/web/live/flows/show.ex b/elixir/apps/web/lib/web/live/flows/show.ex deleted file mode 100644 index 535854867..000000000 --- a/elixir/apps/web/lib/web/live/flows/show.ex +++ /dev/null @@ -1,190 +0,0 @@ -defmodule Web.Flows.Show do - use Web, :live_view - import Web.Policies.Components - alias Domain.{Accounts, Flows} - - def mount(%{"id" => id}, _session, socket) do - with true <- Accounts.flow_activities_enabled?(socket.assigns.account), - {:ok, flow} <- - Flows.fetch_flow_by_id(id, socket.assigns.subject, - preload: [ - policy: [:resource, :actor_group], - client: [], - gateway: [:group], - resource: [] - ] - ) do - last_used_connectivity_type = get_last_used_connectivity_type(flow, socket.assigns.subject) - - socket = - socket - |> assign( - page_title: "Flow #{flow.id}", - flow: flow, - last_used_connectivity_type: last_used_connectivity_type - ) - |> assign_live_table("activities", - query_module: Flows.Activity.Query, - sortable_fields: [], - callback: &handle_activities_update!/2 - ) - - {:ok, socket} - else - _other -> raise Web.LiveErrors.NotFoundError - end - end - - defp get_last_used_connectivity_type(flow, subject) do - case Flows.fetch_last_activity_for(flow, subject) do - {:ok, activity} -> to_string(activity.connectivity_type) - _other -> "N/A" - end - end - - def handle_params(params, uri, socket) do - socket = handle_live_tables_params(socket, params, uri) - {:noreply, socket} - end - - def handle_activities_update!(socket, list_opts) do - with {:ok, activities, metadata} <- - Flows.list_flow_activities_for(socket.assigns.flow, socket.assigns.subject, list_opts) do - {:ok, - assign(socket, - activities: activities, - activities_metadata: metadata - )} - end - end - - def render(assigns) do - ~H""" - <.breadcrumbs account={@account}> - <.breadcrumb>Flows - <.breadcrumb path={~p"/#{@account}/flows/#{@flow.id}"}> - {@flow.client.name} flow - - - - <.section> - <:title> - Flow for: {@flow.client.name} - - <:action> - <.button - navigate={~p"/#{@account}/flows/#{@flow}/activities.csv"} - icon="hero-arrow-down-on-square" - > - Export to CSV - - - <:content flash={@flash}> - <.vertical_table id="flow"> - <.vertical_table_row> - <:label>Authorized At - <:value> - <.relative_datetime datetime={@flow.inserted_at} /> - - - <.vertical_table_row> - <:label>Expires At - <:value> - <.relative_datetime datetime={@flow.expires_at} /> - - - <.vertical_table_row> - <:label>Policy - <:value> - <.link navigate={~p"/#{@account}/policies/#{@flow.policy_id}"} class={link_style()}> - <.policy_name policy={@flow.policy} /> - - - - <.vertical_table_row> - <:label>Client - <:value> - <.link navigate={~p"/#{@account}/clients/#{@flow.client_id}"} class={link_style()}> - {@flow.client.name} - -
Remote IP: {@flow.client_remote_ip}
-
User Agent: {@flow.client_user_agent}
- - - <.vertical_table_row> - <:label>Gateway - <:value> - <.link navigate={~p"/#{@account}/gateways/#{@flow.gateway_id}"} class={link_style()}> - {@flow.gateway.group.name}-{@flow.gateway.name} - -
- Remote IP: {@flow.gateway_remote_ip} -
- - - <.vertical_table_row> - <:label>Resource - <:value> - <.link navigate={~p"/#{@account}/resources/#{@flow.resource_id}"} class={link_style()}> - {@flow.resource.name} - - - - <.vertical_table_row> - <:label>Connectivity Type - <:value> - {@last_used_connectivity_type} - - - - - - - <.section> - <:title>Metrics - <:help> - Pre-aggregated metrics for this flow. - - <:content> - <.live_table - id="activities" - rows={@activities} - row_id={&"activities-#{&1.id}"} - filters={@filters_by_table_id["activities"]} - filter={@filter_form_by_table_id["activities"]} - ordered_by={@order_by_table_id["activities"]} - metadata={@activities_metadata} - > - <:col :let={activity} label="started"> - <.relative_datetime datetime={activity.window_started_at} /> - - <:col :let={activity} label="ended"> - <.relative_datetime datetime={activity.window_ended_at} /> - - <:col :let={activity} label="destination"> - {activity.destination} - - <:col :let={activity} label="connectivity type"> - {activity.connectivity_type} - - <:col :let={activity} label="rx"> - {Sizeable.filesize(activity.rx_bytes)} - - <:col :let={activity} label="tx"> - {Sizeable.filesize(activity.tx_bytes)} - - <:col :let={activity} label="blocked tx"> - {Sizeable.filesize(activity.blocked_tx_bytes)} - - <:empty> -
No metrics to display.
- - - - - """ - end - - def handle_event(event, params, socket) when event in ["paginate", "order_by", "filter"], - do: handle_live_table_event(event, params, socket) -end diff --git a/elixir/apps/web/lib/web/live/policies/show.ex b/elixir/apps/web/lib/web/live/policies/show.ex index adec338c5..44751af53 100644 --- a/elixir/apps/web/lib/web/live/policies/show.ex +++ b/elixir/apps/web/lib/web/live/policies/show.ex @@ -270,11 +270,6 @@ defmodule Web.Policies.Show do
{flow.gateway_remote_ip} - <:col :let={flow} :if={@flow_activities_enabled?} label="activity"> - <.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={link_style()}> - Show - - <:empty>
No activity to display.
diff --git a/elixir/apps/web/lib/web/live/resources/show.ex b/elixir/apps/web/lib/web/live/resources/show.ex index d4c206a51..faa9ecf14 100644 --- a/elixir/apps/web/lib/web/live/resources/show.ex +++ b/elixir/apps/web/lib/web/live/resources/show.ex @@ -366,11 +366,6 @@ defmodule Web.Resources.Show do
{flow.gateway_remote_ip} - <:col :let={flow} :if={@flow_activities_enabled?} label="activity"> - <.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}> - Show - - <:empty>
No activity to display.
diff --git a/elixir/apps/web/lib/web/router.ex b/elixir/apps/web/lib/web/router.ex index d94eea368..89ab70cda 100644 --- a/elixir/apps/web/lib/web/router.ex +++ b/elixir/apps/web/lib/web/router.ex @@ -207,11 +207,6 @@ defmodule Web.Router do live "/:id", Show end - scope "/flows", Flows do - live "/:id", Show - get "/:id/activities.csv", DownloadActivities, :download - end - scope "/settings", Settings do scope "/account" do live "/", Account diff --git a/elixir/apps/web/test/web/live/flows/show_test.exs b/elixir/apps/web/test/web/live/flows/show_test.exs deleted file mode 100644 index 830e03ac9..000000000 --- a/elixir/apps/web/test/web/live/flows/show_test.exs +++ /dev/null @@ -1,224 +0,0 @@ -defmodule Web.Live.Flows.ShowTest do - use Web.ConnCase, async: true - - setup do - account = Fixtures.Accounts.create_account() - actor = Fixtures.Actors.create_actor(type: :account_admin_user, account: account) - identity = Fixtures.Auth.create_identity(account: account, actor: actor) - subject = Fixtures.Auth.create_subject(account: account, actor: actor, identity: identity) - - client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity) - flow = Fixtures.Flows.create_flow(account: account, client: client) - - %{ - account: account, - actor: actor, - identity: identity, - subject: subject, - client: client, - flow: flow - } - end - - test "redirects to sign in page for unauthorized user", %{ - account: account, - flow: flow, - conn: conn - } do - path = ~p"/#{account}/flows/#{flow}" - - assert live(conn, path) == - {:error, - {:redirect, - %{ - to: ~p"/#{account}?#{%{redirect_to: path}}", - flash: %{"error" => "You must sign in to access this page."} - }}} - end - - test "renders 404 error when flow activities are not enabled", %{ - account: account, - identity: identity, - flow: flow, - conn: conn - } do - {:ok, account} = - Domain.Accounts.update_account(account, %{ - features: %{ - flow_activities: false - } - }) - - assert_raise Web.LiveErrors.NotFoundError, fn -> - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") =~ "404" - end - end - - test "renders breadcrumbs item", %{ - account: account, - flow: flow, - client: client, - identity: identity, - conn: conn - } do - {:ok, _lv, html} = - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") - - assert item = Floki.find(html, "[aria-label='Breadcrumb']") - breadcrumbs = String.trim(Floki.text(item)) - assert breadcrumbs =~ "Flows" - assert breadcrumbs =~ "#{client.name} flow" - end - - test "renders flows details", %{ - account: account, - identity: identity, - flow: flow, - conn: conn - } do - flow = - Repo.preload(flow, - policy: [:resource, :actor_group], - client: [], - gateway: [:group], - resource: [] - ) - - activity = - Fixtures.Flows.create_activity( - account: account, - flow: flow, - window_started_at: DateTime.truncate(flow.inserted_at, :second), - window_ended_at: DateTime.truncate(flow.expires_at, :second) - ) - - {:ok, lv, _html} = - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") - - table = - lv - |> element("#flow") - |> render() - |> vertical_table_to_map() - - assert table["authorized at"] - assert table["expires at"] - - assert table["connectivity type"] =~ to_string(activity.connectivity_type) - - assert table["client"] =~ flow.client.name - assert table["client"] =~ to_string(flow.client_remote_ip) - assert table["client"] =~ flow.client_user_agent - - assert table["gateway"] =~ flow.gateway.name - assert table["gateway"] =~ to_string(flow.gateway_remote_ip) - - assert table["resource"] =~ flow.resource.name - - assert table["policy"] =~ flow.policy.resource.name - assert table["policy"] =~ flow.policy.actor_group.name - end - - test "allows downloading activities", %{ - account: account, - flow: flow, - identity: identity, - conn: conn - } do - activity = - Fixtures.Flows.create_activity( - account: account, - flow: flow, - window_started_at: DateTime.truncate(flow.inserted_at, :second), - window_ended_at: DateTime.truncate(flow.expires_at, :second) - ) - - {:ok, lv, _html} = - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") - - lv - |> element("a", "Export to CSV") - |> render_click() - - assert_redirected(lv, ~p"/#{account}/flows/#{flow}/activities.csv") - - controller_conn = get(conn, ~p"/#{account}/flows/#{flow}/activities.csv") - assert redirected_to(controller_conn) =~ ~p"/#{account}" - assert flash(controller_conn, :error) == "You must sign in to access this page." - - controller_conn = - conn - |> authorize_conn(identity) - |> get(~p"/#{account}/flows/#{flow}/activities.csv") - - assert response = response(controller_conn, 200) - - assert response - |> String.trim() - |> String.split("\n") - |> Enum.map(&String.split(&1, "\t")) == - [ - [ - "window_started_at", - "window_ended_at", - "destination", - "connectivity_type", - "rx_bytes", - "tx_bytes", - "blocked_tx_bytes" - ], - [ - to_string(activity.window_started_at), - to_string(activity.window_ended_at), - to_string(activity.destination), - to_string(activity.connectivity_type), - to_string(activity.rx_bytes), - to_string(activity.tx_bytes), - to_string(activity.blocked_tx_bytes) - ] - ] - end - - test "renders activities table", %{ - account: account, - flow: flow, - identity: identity, - conn: conn - } do - activity = - Fixtures.Flows.create_activity( - account: account, - flow: flow, - window_started_at: DateTime.truncate(flow.inserted_at, :second), - window_ended_at: DateTime.truncate(flow.expires_at, :second), - tx_bytes: 1024 * 1024 * 1024 * 42 - ) - - {:ok, lv, _html} = - conn - |> authorize_conn(identity) - |> live(~p"/#{account}/flows/#{flow}") - - [row] = - lv - |> element("#activities") - |> render() - |> table_to_map() - - assert row["started"] - assert row["ended"] - - assert row["connectivity type"] == to_string(activity.connectivity_type) - assert row["destination"] == to_string(activity.destination) - assert row["rx"] == "#{activity.rx_bytes} B" - assert row["tx"] == "42 GB" - end -end diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 46abe73b0..84b133d8d 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -59,7 +59,6 @@ config :domain, Domain.ChangeLogs.ReplicationConnection, auth_providers clients flow_activities - flows gateway_groups gateways policies @@ -95,7 +94,6 @@ config :domain, Domain.Events.ReplicationConnection, auth_providers clients flow_activities - flows gateway_groups gateways policies