mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 18:18:55 +00:00
refactor(portal): don't rely on flows.expires_at (#9692)
The `expires_at` column on the `flows` table was never used outside of
the context in which the flow was created in the Client Channel. This
ephemeral state, which is created in the `Domain.Flows.authorize_flow/4`
function, is never read from the DB in any meaningful capacity, so it
can be safely removed.
The `expire_flows_for` family of functions now simply reads the needed
fields from the flows table in order to broadcast `{:expire_flow,
flow_id, client_id, resource_id}` directly to the subscribed entities.
This PR is step 1 in removing the reliance on `Flows` to manage
ephemeral access state. In a subsequent PR we will actually change the
structure of what state is kept in the channel PIDs such that reliance
on this Flows table will no longer be necessary.
Additionally, in a few places, we were referencing a Flows.Show view
that was never available in production, so this dead code has been
removed.
Lastly, the `flows` table subscription and associated hook processing
has been completely removed as it is no longer needed. We've implemented
in #9667 logic to remove publications from removed table subscriptions,
so we can expect to get a couple ingest warnings when we deploy this as
the `Hooks.Flows` processor no longer exists, and the WAL data may have
lingering flows records in the queue. These can be safely ignored.
This commit is contained in:
2
.github/workflows/_integration_tests.yml
vendored
2
.github/workflows/_integration_tests.yml
vendored
@@ -120,7 +120,7 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
project: firezone-staging
|
project: firezone-staging
|
||||||
- name: Seed database
|
- name: Seed database
|
||||||
run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed'
|
run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.migrate --migrations-path priv/repo/migrations --migrations-path priv/repo/manual_migrations && mix ecto.seed'
|
||||||
- name: Start docker compose in the background
|
- name: Start docker compose in the background
|
||||||
run: |
|
run: |
|
||||||
set -xe
|
set -xe
|
||||||
|
|||||||
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@@ -306,7 +306,7 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
project: firezone-staging
|
project: firezone-staging
|
||||||
- name: Seed database
|
- name: Seed database
|
||||||
run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed'
|
run: docker compose run elixir /bin/sh -c 'cd apps/domain && mix ecto.seed --migrations-path priv/repo/migrations --migrations-path priv/repo/manual_migrations'
|
||||||
- name: Start docker compose in the background
|
- name: Start docker compose in the background
|
||||||
run: |
|
run: |
|
||||||
# We need to increase the log level to make sure that they don't hold off storm of packets
|
# We need to increase the log level to make sure that they don't hold off storm of packets
|
||||||
|
|||||||
@@ -643,7 +643,7 @@ defmodule API.Client.Channel do
|
|||||||
OpenTelemetry.Tracer.set_attribute(:gateways_count, length(gateways)),
|
OpenTelemetry.Tracer.set_attribute(:gateways_count, length(gateways)),
|
||||||
gateway = Gateways.load_balance_gateways(location, gateways, connected_gateway_ids),
|
gateway = Gateways.load_balance_gateways(location, gateways, connected_gateway_ids),
|
||||||
OpenTelemetry.Tracer.set_attribute(:gateway_id, gateway.id),
|
OpenTelemetry.Tracer.set_attribute(:gateway_id, gateway.id),
|
||||||
{:ok, resource, flow} <-
|
{:ok, resource, flow, expires_at} <-
|
||||||
Flows.authorize_flow(
|
Flows.authorize_flow(
|
||||||
socket.assigns.client,
|
socket.assigns.client,
|
||||||
gateway,
|
gateway,
|
||||||
@@ -665,7 +665,7 @@ defmodule API.Client.Channel do
|
|||||||
client_id: socket.assigns.client.id,
|
client_id: socket.assigns.client.id,
|
||||||
resource_id: resource.id,
|
resource_id: resource.id,
|
||||||
flow_id: flow.id,
|
flow_id: flow.id,
|
||||||
authorization_expires_at: flow.expires_at,
|
authorization_expires_at: expires_at,
|
||||||
ice_credentials: ice_credentials,
|
ice_credentials: ice_credentials,
|
||||||
preshared_key: preshared_key
|
preshared_key: preshared_key
|
||||||
}, {opentelemetry_ctx, opentelemetry_span_ctx}}
|
}, {opentelemetry_ctx, opentelemetry_span_ctx}}
|
||||||
@@ -789,7 +789,7 @@ defmodule API.Client.Channel do
|
|||||||
|
|
||||||
OpenTelemetry.Tracer.with_span "client.reuse_connection", attributes: attrs do
|
OpenTelemetry.Tracer.with_span "client.reuse_connection", attributes: attrs do
|
||||||
with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
|
with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
|
||||||
{:ok, resource, flow} <-
|
{:ok, resource, flow, _expires_at} <-
|
||||||
Flows.authorize_flow(
|
Flows.authorize_flow(
|
||||||
socket.assigns.client,
|
socket.assigns.client,
|
||||||
gateway,
|
gateway,
|
||||||
@@ -851,7 +851,7 @@ defmodule API.Client.Channel do
|
|||||||
|
|
||||||
OpenTelemetry.Tracer.with_span "client.request_connection", attributes: ctx_attrs do
|
OpenTelemetry.Tracer.with_span "client.request_connection", attributes: ctx_attrs do
|
||||||
with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
|
with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
|
||||||
{:ok, resource, flow} <-
|
{:ok, resource, flow, _expires_at} <-
|
||||||
Flows.authorize_flow(
|
Flows.authorize_flow(
|
||||||
socket.assigns.client,
|
socket.assigns.client,
|
||||||
gateway,
|
gateway,
|
||||||
|
|||||||
@@ -224,7 +224,7 @@ defmodule API.Gateway.ChannelTest do
|
|||||||
|
|
||||||
assert_push "allow_access", %{}
|
assert_push "allow_access", %{}
|
||||||
|
|
||||||
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
|
assert :ok = Domain.Flows.expire_flows_for(resource, subject)
|
||||||
|
|
||||||
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
|
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
|
||||||
|
|
||||||
@@ -647,7 +647,7 @@ defmodule API.Gateway.ChannelTest do
|
|||||||
|
|
||||||
assert_push "request_connection", %{}
|
assert_push "request_connection", %{}
|
||||||
|
|
||||||
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
|
assert :ok = Domain.Flows.expire_flows_for(resource, subject)
|
||||||
|
|
||||||
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
|
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
|
||||||
|
|
||||||
@@ -878,7 +878,7 @@ defmodule API.Gateway.ChannelTest do
|
|||||||
|
|
||||||
assert_push "authorize_flow", %{}
|
assert_push "authorize_flow", %{}
|
||||||
|
|
||||||
{:ok, [_flow]} = Domain.Flows.expire_flows_for(resource, subject)
|
assert :ok = Domain.Flows.expire_flows_for(resource, subject)
|
||||||
|
|
||||||
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
|
send(socket.channel_pid, {:expire_flow, flow.id, client.id, resource.id})
|
||||||
|
|
||||||
|
|||||||
@@ -34,13 +34,6 @@ defmodule Domain.ChangeLogs.ReplicationConnection do
|
|||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
defp log(_op, _lsn, "flows", _old_data, _data) do
|
|
||||||
# TODO: WAL
|
|
||||||
# Flows are not logged to the change log as they are used only to trigger side effects which
|
|
||||||
# will be removed. Remove the flows table publication when that happens.
|
|
||||||
:ok
|
|
||||||
end
|
|
||||||
|
|
||||||
defp log(op, lsn, table, old_data, data) do
|
defp log(op, lsn, table, old_data, data) do
|
||||||
attrs = %{
|
attrs = %{
|
||||||
op: op,
|
op: op,
|
||||||
|
|||||||
@@ -212,8 +212,10 @@ defmodule Domain.Clients do
|
|||||||
preload: [:online?]
|
preload: [:online?]
|
||||||
)
|
)
|
||||||
|> case do
|
|> case do
|
||||||
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
{:ok, client} ->
|
{:ok, client} ->
|
||||||
{:ok, _flows} = Flows.expire_flows_for(client)
|
:ok = Flows.expire_flows_for(client)
|
||||||
{:ok, client}
|
{:ok, client}
|
||||||
|
|
||||||
{:error, reason} ->
|
{:error, reason} ->
|
||||||
|
|||||||
@@ -22,7 +22,10 @@ defmodule Domain.Events.Hooks.ActorGroupMemberships do
|
|||||||
Task.start(fn ->
|
Task.start(fn ->
|
||||||
:ok = PubSub.Actor.Memberships.broadcast(actor_id, {:delete_membership, actor_id, group_id})
|
:ok = PubSub.Actor.Memberships.broadcast(actor_id, {:delete_membership, actor_id, group_id})
|
||||||
broadcast_access(:reject, actor_id, group_id)
|
broadcast_access(:reject, actor_id, group_id)
|
||||||
{:ok, _flows} = Flows.expire_flows_for(account_id, actor_id, group_id)
|
|
||||||
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Flows.expire_flows_for(account_id, actor_id, group_id)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
|
|||||||
@@ -1,51 +0,0 @@
|
|||||||
# TODO: WAL
|
|
||||||
# Move side-effects from flows to state table in clients and gateways
|
|
||||||
defmodule Domain.Events.Hooks.Flows do
|
|
||||||
@behaviour Domain.Events.Hooks
|
|
||||||
alias Domain.PubSub
|
|
||||||
require Logger
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def on_insert(_data), do: :ok
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def on_update(
|
|
||||||
_old_data,
|
|
||||||
%{
|
|
||||||
"id" => flow_id,
|
|
||||||
"client_id" => client_id,
|
|
||||||
"resource_id" => resource_id,
|
|
||||||
"expires_at" => expires_at
|
|
||||||
} = _data
|
|
||||||
) do
|
|
||||||
if expired?(expires_at) do
|
|
||||||
# Flow has become expired
|
|
||||||
PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
|
|
||||||
else
|
|
||||||
:ok
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases
|
|
||||||
# where we might manually clear flows in a migration or some other mechanism.
|
|
||||||
@impl true
|
|
||||||
def on_delete(
|
|
||||||
%{
|
|
||||||
"id" => flow_id,
|
|
||||||
"client_id" => client_id,
|
|
||||||
"resource_id" => resource_id
|
|
||||||
} = _old_data
|
|
||||||
) do
|
|
||||||
PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
|
|
||||||
end
|
|
||||||
|
|
||||||
defp expired?(nil), do: false
|
|
||||||
|
|
||||||
defp expired?(expires_at) do
|
|
||||||
with {:ok, expires_at, _} <- DateTime.from_iso8601(expires_at) do
|
|
||||||
DateTime.compare(DateTime.utc_now(), expires_at) == :gt
|
|
||||||
else
|
|
||||||
_ -> false
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -69,7 +69,9 @@ defmodule Domain.Events.Hooks.Policies do
|
|||||||
payload = {:reject_access, policy_id, actor_group_id, resource_id}
|
payload = {:reject_access, policy_id, actor_group_id, resource_id}
|
||||||
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
|
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
|
||||||
|
|
||||||
Flows.expire_flows_for_policy_id(account_id, policy_id)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Flows.expire_flows_for_policy_id(account_id, policy_id)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
@@ -124,7 +126,9 @@ defmodule Domain.Events.Hooks.Policies do
|
|||||||
payload = {:allow_access, policy_id, actor_group_id, resource_id}
|
payload = {:allow_access, policy_id, actor_group_id, resource_id}
|
||||||
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
|
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
|
||||||
|
|
||||||
Flows.expire_flows_for_policy_id(account_id, policy_id)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Flows.expire_flows_for_policy_id(account_id, policy_id)
|
||||||
end)
|
end)
|
||||||
else
|
else
|
||||||
Logger.warning("Breaking update ignored for policy as it is deleted or disabled",
|
Logger.warning("Breaking update ignored for policy as it is deleted or disabled",
|
||||||
@@ -161,7 +165,9 @@ defmodule Domain.Events.Hooks.Policies do
|
|||||||
payload = {:reject_access, policy_id, actor_group_id, resource_id}
|
payload = {:reject_access, policy_id, actor_group_id, resource_id}
|
||||||
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
|
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
|
||||||
|
|
||||||
Flows.expire_flows_for_policy_id(account_id, policy_id)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Flows.expire_flows_for_policy_id(account_id, policy_id)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
|
|||||||
@@ -11,11 +11,10 @@ defmodule Domain.Events.Hooks.ResourceConnections do
|
|||||||
@impl true
|
@impl true
|
||||||
def on_delete(%{"account_id" => account_id, "resource_id" => resource_id} = _old_data) do
|
def on_delete(%{"account_id" => account_id, "resource_id" => resource_id} = _old_data) do
|
||||||
# TODO: WAL
|
# TODO: WAL
|
||||||
# The flow expires_at field is not used for any persistence-related reason.
|
# Broadcast flow side effects directly
|
||||||
# Remove it and broadcast directly to subscribed pids to remove the flow
|
# This hook is called when resources change sites.
|
||||||
# from their local state. This hook is called when resources change sites.
|
|
||||||
Task.start(fn ->
|
Task.start(fn ->
|
||||||
{:ok, _flows} = Flows.expire_flows_for_resource_id(account_id, resource_id)
|
:ok = Flows.expire_flows_for_resource_id(account_id, resource_id)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ defmodule Domain.Events.Hooks.Resources do
|
|||||||
old_filters != filters or
|
old_filters != filters or
|
||||||
old_ip_stack != ip_stack do
|
old_ip_stack != ip_stack do
|
||||||
# TODO: WAL
|
# TODO: WAL
|
||||||
# Directly broadcast to subscribed pids to remove the flow
|
# Broadcast flow side effects directly
|
||||||
Task.start(fn ->
|
Task.start(fn ->
|
||||||
payload = {:delete_resource, resource_id}
|
payload = {:delete_resource, resource_id}
|
||||||
PubSub.Resource.broadcast(resource_id, payload)
|
PubSub.Resource.broadcast(resource_id, payload)
|
||||||
@@ -49,7 +49,7 @@ defmodule Domain.Events.Hooks.Resources do
|
|||||||
PubSub.Resource.broadcast(resource_id, payload)
|
PubSub.Resource.broadcast(resource_id, payload)
|
||||||
PubSub.Account.Resources.broadcast(account_id, payload)
|
PubSub.Account.Resources.broadcast(account_id, payload)
|
||||||
|
|
||||||
{:ok, _flows} = Flows.expire_flows_for_resource_id(account_id, resource_id)
|
:ok = Flows.expire_flows_for_resource_id(account_id, resource_id)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ defmodule Domain.Events.ReplicationConnection do
|
|||||||
"auth_providers" => Hooks.AuthProviders,
|
"auth_providers" => Hooks.AuthProviders,
|
||||||
"clients" => Hooks.Clients,
|
"clients" => Hooks.Clients,
|
||||||
"flow_activities" => Hooks.FlowActivities,
|
"flow_activities" => Hooks.FlowActivities,
|
||||||
"flows" => Hooks.Flows,
|
|
||||||
"gateway_groups" => Hooks.GatewayGroups,
|
"gateway_groups" => Hooks.GatewayGroups,
|
||||||
"gateways" => Hooks.Gateways,
|
"gateways" => Hooks.Gateways,
|
||||||
"policies" => Hooks.Policies,
|
"policies" => Hooks.Policies,
|
||||||
|
|||||||
@@ -43,12 +43,13 @@ defmodule Domain.Flows do
|
|||||||
account_id: account_id,
|
account_id: account_id,
|
||||||
client_remote_ip: client_remote_ip,
|
client_remote_ip: client_remote_ip,
|
||||||
client_user_agent: client_user_agent,
|
client_user_agent: client_user_agent,
|
||||||
gateway_remote_ip: gateway_remote_ip,
|
gateway_remote_ip: gateway_remote_ip
|
||||||
expires_at: conformation_expires_at || expires_at
|
|
||||||
})
|
})
|
||||||
|> Repo.insert!()
|
|> Repo.insert!()
|
||||||
|
|
||||||
{:ok, resource, flow}
|
expires_at = conformation_expires_at || expires_at
|
||||||
|
|
||||||
|
{:ok, resource, flow, expires_at}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -250,11 +251,32 @@ defmodule Domain.Flows do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp expire_flows(queryable) do
|
defp expire_flows(queryable) do
|
||||||
{_count, flows} =
|
{:ok, :ok} =
|
||||||
queryable
|
Repo.transaction(fn ->
|
||||||
|> Flow.Query.expire()
|
queryable
|
||||||
|> Repo.update_all([])
|
|> Repo.stream()
|
||||||
|
|> Stream.chunk_every(100)
|
||||||
|
|> Enum.each(fn chunk ->
|
||||||
|
Enum.each(chunk, &broadcast_flow_expiration/1)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
{:ok, flows}
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
defp broadcast_flow_expiration(flow) do
|
||||||
|
case Domain.PubSub.Flow.broadcast(
|
||||||
|
flow.id,
|
||||||
|
{:expire_flow, flow.id, flow.client_id, flow.resource_id}
|
||||||
|
) do
|
||||||
|
:ok ->
|
||||||
|
:ok
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.error("Failed to broadcast flow expiration",
|
||||||
|
reason: inspect(reason),
|
||||||
|
flow_id: flow.id
|
||||||
|
)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
# TODO: Flows -> PolicyAuthorizations
|
||||||
|
# When this is renamed, "PolicyAuthorizations Authorizer" makes no sense, remove this module
|
||||||
defmodule Domain.Flows.Authorizer do
|
defmodule Domain.Flows.Authorizer do
|
||||||
use Domain.Auth.Authorizer
|
use Domain.Auth.Authorizer
|
||||||
alias Domain.Flows.{Flow, Activity}
|
alias Domain.Flows.{Flow, Activity}
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ defmodule Domain.Flows.Flow do
|
|||||||
|
|
||||||
field :gateway_remote_ip, Domain.Types.IP
|
field :gateway_remote_ip, Domain.Types.IP
|
||||||
|
|
||||||
field :expires_at, :utc_datetime_usec
|
|
||||||
timestamps(updated_at: false)
|
timestamps(updated_at: false)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -5,14 +5,12 @@ defmodule Domain.Flows.Flow.Changeset do
|
|||||||
@fields ~w[token_id policy_id client_id gateway_id resource_id
|
@fields ~w[token_id policy_id client_id gateway_id resource_id
|
||||||
account_id
|
account_id
|
||||||
client_remote_ip client_user_agent
|
client_remote_ip client_user_agent
|
||||||
gateway_remote_ip
|
gateway_remote_ip]a
|
||||||
expires_at]a
|
|
||||||
@required_fields @fields -- ~w[expires_at]a
|
|
||||||
|
|
||||||
def create(attrs) do
|
def create(attrs) do
|
||||||
%Flow{}
|
%Flow{}
|
||||||
|> cast(attrs, @fields)
|
|> cast(attrs, @fields)
|
||||||
|> validate_required(@required_fields)
|
|> validate_required(@fields)
|
||||||
|> assoc_constraint(:token)
|
|> assoc_constraint(:token)
|
||||||
|> assoc_constraint(:policy)
|
|> assoc_constraint(:policy)
|
||||||
|> assoc_constraint(:client)
|
|> assoc_constraint(:client)
|
||||||
|
|||||||
@@ -5,10 +5,6 @@ defmodule Domain.Flows.Flow.Query do
|
|||||||
from(flows in Domain.Flows.Flow, as: :flows)
|
from(flows in Domain.Flows.Flow, as: :flows)
|
||||||
end
|
end
|
||||||
|
|
||||||
def not_expired(queryable \\ all()) do
|
|
||||||
where(queryable, [flows: flows], flows.expires_at > fragment("timezone('UTC', NOW())"))
|
|
||||||
end
|
|
||||||
|
|
||||||
def by_id(queryable, id) do
|
def by_id(queryable, id) do
|
||||||
where(queryable, [flows: flows], flows.id == ^id)
|
where(queryable, [flows: flows], flows.id == ^id)
|
||||||
end
|
end
|
||||||
@@ -61,17 +57,6 @@ defmodule Domain.Flows.Flow.Query do
|
|||||||
where(queryable, [flows: flows], flows.gateway_id == ^gateway_id)
|
where(queryable, [flows: flows], flows.gateway_id == ^gateway_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
def expire(queryable) do
|
|
||||||
queryable
|
|
||||||
|> not_expired()
|
|
||||||
|> Ecto.Query.select([flows: flows], flows)
|
|
||||||
|> Ecto.Query.update([flows: flows],
|
|
||||||
set: [
|
|
||||||
expires_at: fragment("LEAST(?, timezone('UTC', NOW()))", flows.expires_at)
|
|
||||||
]
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
def with_joined_policy(queryable) do
|
def with_joined_policy(queryable) do
|
||||||
with_named_binding(queryable, :policy, fn queryable, binding ->
|
with_named_binding(queryable, :policy, fn queryable, binding ->
|
||||||
join(queryable, :inner, [flows: flows], policy in assoc(flows, ^binding), as: ^binding)
|
join(queryable, :inner, [flows: flows], policy in assoc(flows, ^binding), as: ^binding)
|
||||||
@@ -100,27 +85,4 @@ defmodule Domain.Flows.Flow.Query do
|
|||||||
{:flows, :desc, :inserted_at},
|
{:flows, :desc, :inserted_at},
|
||||||
{:flows, :asc, :id}
|
{:flows, :asc, :id}
|
||||||
]
|
]
|
||||||
|
|
||||||
@impl Domain.Repo.Query
|
|
||||||
def filters,
|
|
||||||
do: [
|
|
||||||
%Domain.Repo.Filter{
|
|
||||||
name: :expiration,
|
|
||||||
title: "Expired",
|
|
||||||
type: :string,
|
|
||||||
values: [
|
|
||||||
{"Expired", "expired"},
|
|
||||||
{"Not Expired", "not_expired"}
|
|
||||||
],
|
|
||||||
fun: &filter_by_expired/2
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
def filter_by_expired(queryable, "expired") do
|
|
||||||
{queryable, dynamic([flows: flows], flows.expires_at < fragment("timezone('UTC', NOW())"))}
|
|
||||||
end
|
|
||||||
|
|
||||||
def filter_by_expired(queryable, "not_expired") do
|
|
||||||
{queryable, dynamic([flows: flows], flows.expires_at >= fragment("timezone('UTC', NOW())"))}
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -183,7 +183,9 @@ defmodule Domain.Tokens do
|
|||||||
]}
|
]}
|
||||||
|
|
||||||
with :ok <- Auth.ensure_has_permissions(subject, required_permissions) do
|
with :ok <- Auth.ensure_has_permissions(subject, required_permissions) do
|
||||||
{:ok, _flows} = Domain.Flows.expire_flows_for(token, subject)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Domain.Flows.expire_flows_for(token, subject)
|
||||||
|
|
||||||
Token.Query.not_deleted()
|
Token.Query.not_deleted()
|
||||||
|> Token.Query.by_id(token.id)
|
|> Token.Query.by_id(token.id)
|
||||||
@@ -203,7 +205,9 @@ defmodule Domain.Tokens do
|
|||||||
|> delete_tokens()
|
|> delete_tokens()
|
||||||
|> case do
|
|> case do
|
||||||
{:ok, [token]} ->
|
{:ok, [token]} ->
|
||||||
{:ok, _flows} = Domain.Flows.expire_flows_for(token, subject)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Domain.Flows.expire_flows_for(token, subject)
|
||||||
{:ok, token}
|
{:ok, token}
|
||||||
|
|
||||||
{:ok, []} ->
|
{:ok, []} ->
|
||||||
@@ -212,7 +216,9 @@ defmodule Domain.Tokens do
|
|||||||
end
|
end
|
||||||
|
|
||||||
def delete_tokens_for(%Auth.Identity{} = identity) do
|
def delete_tokens_for(%Auth.Identity{} = identity) do
|
||||||
{:ok, _flows} = Domain.Flows.expire_flows_for(identity)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Domain.Flows.expire_flows_for(identity)
|
||||||
|
|
||||||
Token.Query.not_deleted()
|
Token.Query.not_deleted()
|
||||||
|> Token.Query.by_identity_id(identity.id)
|
|> Token.Query.by_identity_id(identity.id)
|
||||||
@@ -221,7 +227,9 @@ defmodule Domain.Tokens do
|
|||||||
|
|
||||||
def delete_tokens_for(%Actors.Actor{} = actor, %Auth.Subject{} = subject) do
|
def delete_tokens_for(%Actors.Actor{} = actor, %Auth.Subject{} = subject) do
|
||||||
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
|
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
|
||||||
{:ok, _flows} = Domain.Flows.expire_flows_for(actor, subject)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Domain.Flows.expire_flows_for(actor, subject)
|
||||||
|
|
||||||
Token.Query.not_deleted()
|
Token.Query.not_deleted()
|
||||||
|> Token.Query.by_actor_id(actor.id)
|
|> Token.Query.by_actor_id(actor.id)
|
||||||
@@ -232,7 +240,9 @@ defmodule Domain.Tokens do
|
|||||||
|
|
||||||
def delete_tokens_for(%Auth.Identity{} = identity, %Auth.Subject{} = subject) do
|
def delete_tokens_for(%Auth.Identity{} = identity, %Auth.Subject{} = subject) do
|
||||||
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
|
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
|
||||||
{:ok, _flows} = Domain.Flows.expire_flows_for(identity, subject)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Domain.Flows.expire_flows_for(identity, subject)
|
||||||
|
|
||||||
Token.Query.not_deleted()
|
Token.Query.not_deleted()
|
||||||
|> Token.Query.by_identity_id(identity.id)
|
|> Token.Query.by_identity_id(identity.id)
|
||||||
@@ -243,7 +253,9 @@ defmodule Domain.Tokens do
|
|||||||
|
|
||||||
def delete_tokens_for(%Auth.Provider{} = provider, %Auth.Subject{} = subject) do
|
def delete_tokens_for(%Auth.Provider{} = provider, %Auth.Subject{} = subject) do
|
||||||
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
|
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_tokens_permission()) do
|
||||||
{:ok, _flows} = Domain.Flows.expire_flows_for(provider, subject)
|
# TODO: WAL
|
||||||
|
# Broadcast flow side effects directly
|
||||||
|
:ok = Domain.Flows.expire_flows_for(provider, subject)
|
||||||
|
|
||||||
Token.Query.not_deleted()
|
Token.Query.not_deleted()
|
||||||
|> Token.Query.by_provider_id(provider.id)
|
|> Token.Query.by_provider_id(provider.id)
|
||||||
|
|||||||
@@ -0,0 +1,15 @@
|
|||||||
|
defmodule Domain.Repo.Migrations.DropExpiresAtFromFlows do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def up do
|
||||||
|
alter table(:flows) do
|
||||||
|
remove(:expires_at)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def down do
|
||||||
|
alter table(:flows) do
|
||||||
|
add(:expires_at, :utc_datetime_usec, null: false)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
defmodule Domain.Repo.Migrations.ChangeColumnNullFlowsExpiresAt do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def up do
|
||||||
|
alter table(:flows) do
|
||||||
|
modify(:expires_at, :utc_datetime_usec, null: true)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def down do
|
||||||
|
alter table(:flows) do
|
||||||
|
modify(:expires_at, :utc_datetime_usec, null: false)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -1176,7 +1176,7 @@ defmodule Domain.Repo.Seeds do
|
|||||||
|
|
||||||
IO.puts("")
|
IO.puts("")
|
||||||
|
|
||||||
{:ok, _resource, flow} =
|
{:ok, _resource, flow, _expires_at} =
|
||||||
Flows.authorize_flow(
|
Flows.authorize_flow(
|
||||||
user_iphone,
|
user_iphone,
|
||||||
gateway1,
|
gateway1,
|
||||||
|
|||||||
@@ -1248,12 +1248,12 @@ defmodule Domain.ActorsTest do
|
|||||||
"group_id" => group2.id
|
"group_id" => group2.id
|
||||||
})
|
})
|
||||||
|
|
||||||
# TODO: WAL
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
# Remove this when direct broadcast is implemented
|
|
||||||
Process.sleep(100)
|
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
flow_id = flow.id
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "deletes memberships of removed groups", %{
|
test "deletes memberships of removed groups", %{
|
||||||
@@ -3043,16 +3043,21 @@ defmodule Domain.ActorsTest do
|
|||||||
subject = Fixtures.Auth.create_subject(identity: identity)
|
subject = Fixtures.Auth.create_subject(identity: identity)
|
||||||
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
||||||
|
|
||||||
Fixtures.Flows.create_flow(
|
flow =
|
||||||
account: account,
|
Fixtures.Flows.create_flow(
|
||||||
subject: subject,
|
account: account,
|
||||||
client: client
|
subject: subject,
|
||||||
)
|
client: client
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
|
|
||||||
assert {:ok, _actor} = disable_actor(actor, subject)
|
assert {:ok, _actor} = disable_actor(actor, subject)
|
||||||
|
|
||||||
expires_at = Repo.one(Domain.Flows.Flow).expires_at
|
flow_id = flow.id
|
||||||
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "returns error when trying to disable the last admin actor" do
|
test "returns error when trying to disable the last admin actor" do
|
||||||
@@ -3287,16 +3292,21 @@ defmodule Domain.ActorsTest do
|
|||||||
} do
|
} do
|
||||||
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
||||||
|
|
||||||
Fixtures.Flows.create_flow(
|
flow =
|
||||||
account: account,
|
Fixtures.Flows.create_flow(
|
||||||
subject: subject,
|
account: account,
|
||||||
client: client
|
subject: subject,
|
||||||
)
|
client: client
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
|
|
||||||
assert {:ok, _actor} = delete_actor(actor, subject)
|
assert {:ok, _actor} = delete_actor(actor, subject)
|
||||||
|
|
||||||
expires_at = Repo.one(Domain.Flows.Flow).expires_at
|
flow_id = flow.id
|
||||||
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "returns error when trying to delete the last admin actor", %{
|
test "returns error when trying to delete the last admin actor", %{
|
||||||
|
|||||||
@@ -629,6 +629,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
|
|||||||
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
|
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
|
||||||
Fixtures.Actors.create_membership(account: account, actor: actor, group: org_unit)
|
Fixtures.Actors.create_membership(account: account, actor: actor, group: org_unit)
|
||||||
|
|
||||||
|
:ok = PubSub.Flow.subscribe(deleted_group_flow.id)
|
||||||
|
:ok = PubSub.Flow.subscribe(deleted_identity_flow.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
|
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
|
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
|
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
|
||||||
@@ -752,12 +754,16 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
|
|||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
|
|
||||||
# Deleted policies expire all flows authorized by them
|
# Deleted policies expire all flows authorized by them
|
||||||
deleted_group_flow = Repo.reload(deleted_group_flow)
|
flow_id = deleted_group_flow.id
|
||||||
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_group_flow.client_id
|
||||||
|
resource_id = deleted_group_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
# Expires flows for signed out user
|
# Expires flows for signed out user
|
||||||
deleted_identity_flow = Repo.reload(deleted_identity_flow)
|
flow_id = deleted_identity_flow.id
|
||||||
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_identity_flow.client_id
|
||||||
|
resource_id = deleted_identity_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
# Should not do anything else
|
# Should not do anything else
|
||||||
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
|
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
|
||||||
|
|||||||
@@ -416,6 +416,8 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
|
|||||||
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
|
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
|
||||||
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
|
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
|
||||||
|
|
||||||
|
:ok = PubSub.Flow.subscribe(deleted_identity_flow.id)
|
||||||
|
:ok = PubSub.Flow.subscribe(deleted_group_flow.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
|
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
|
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
|
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
|
||||||
@@ -516,12 +518,16 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
|
|||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
|
|
||||||
# Deleted policies expire all flows authorized by them
|
# Deleted policies expire all flows authorized by them
|
||||||
deleted_group_flow = Repo.reload(deleted_group_flow)
|
flow_id = deleted_group_flow.id
|
||||||
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_group_flow.client_id
|
||||||
|
resource_id = deleted_group_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
# Expires flows for signed out user
|
# Expires flows for signed out user
|
||||||
deleted_identity_flow = Repo.reload(deleted_identity_flow)
|
flow_id = deleted_identity_flow.id
|
||||||
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_identity_flow.client_id
|
||||||
|
resource_id = deleted_identity_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
# Should not do anything else
|
# Should not do anything else
|
||||||
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
|
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
|
||||||
|
|||||||
@@ -464,6 +464,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
|
|||||||
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
|
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
|
||||||
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
|
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
|
||||||
|
|
||||||
|
:ok = PubSub.Flow.subscribe(deleted_identity_flow.id)
|
||||||
|
:ok = PubSub.Flow.subscribe(deleted_group_flow.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
|
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
|
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
|
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
|
||||||
@@ -583,12 +585,16 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
|
|||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
|
|
||||||
# Deleted policies expire all flows authorized by them
|
# Deleted policies expire all flows authorized by them
|
||||||
deleted_group_flow = Repo.reload(deleted_group_flow)
|
flow_id = deleted_group_flow.id
|
||||||
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_group_flow.client_id
|
||||||
|
resource_id = deleted_group_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
# Expires flows for signed out user
|
# Expires flows for signed out user
|
||||||
deleted_identity_flow = Repo.reload(deleted_identity_flow)
|
flow_id = deleted_identity_flow.id
|
||||||
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_identity_flow.client_id
|
||||||
|
resource_id = deleted_identity_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
# Should not do anything else
|
# Should not do anything else
|
||||||
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
|
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
|
||||||
|
|||||||
@@ -708,6 +708,8 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
|
|||||||
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
|
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
|
||||||
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
|
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
|
||||||
|
|
||||||
|
:ok = PubSub.Flow.subscribe(deleted_identity_flow.id)
|
||||||
|
:ok = PubSub.Flow.subscribe(deleted_group_flow.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
|
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
|
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
|
||||||
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
|
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
|
||||||
@@ -818,12 +820,16 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
|
|||||||
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
|
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
|
||||||
|
|
||||||
# Deleted policies expire all flows authorized by them
|
# Deleted policies expire all flows authorized by them
|
||||||
deleted_group_flow = Repo.reload(deleted_group_flow)
|
flow_id = deleted_group_flow.id
|
||||||
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_group_flow.client_id
|
||||||
|
resource_id = deleted_group_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
# Expires flows for signed out user
|
# Expires flows for signed out user
|
||||||
deleted_identity_flow = Repo.reload(deleted_identity_flow)
|
flow_id = deleted_identity_flow.id
|
||||||
assert DateTime.compare(deleted_identity_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_identity_flow.client_id
|
||||||
|
resource_id = deleted_identity_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
# Should not do anything else
|
# Should not do anything else
|
||||||
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
|
refute_received {:allow_access, _policy_id, _group_id, _resource_id}
|
||||||
|
|||||||
@@ -1062,16 +1062,21 @@ defmodule Domain.AuthTest do
|
|||||||
} do
|
} do
|
||||||
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
||||||
|
|
||||||
Fixtures.Flows.create_flow(
|
flow =
|
||||||
account: account,
|
Fixtures.Flows.create_flow(
|
||||||
subject: subject,
|
account: account,
|
||||||
client: client
|
subject: subject,
|
||||||
)
|
client: client
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
|
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = client.id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
assert {:ok, _provider} = disable_provider(provider, subject)
|
assert {:ok, _provider} = disable_provider(provider, subject)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
expires_at = Repo.one(Domain.Flows.Flow).expires_at
|
|
||||||
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "returns error when trying to disable the last provider", %{
|
test "returns error when trying to disable the last provider", %{
|
||||||
@@ -1282,16 +1287,21 @@ defmodule Domain.AuthTest do
|
|||||||
} do
|
} do
|
||||||
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
||||||
|
|
||||||
Fixtures.Flows.create_flow(
|
flow =
|
||||||
account: account,
|
Fixtures.Flows.create_flow(
|
||||||
subject: subject,
|
account: account,
|
||||||
client: client
|
subject: subject,
|
||||||
)
|
client: client
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
|
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = client.id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
assert {:ok, _provider} = delete_provider(provider, subject)
|
assert {:ok, _provider} = delete_provider(provider, subject)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
expires_at = Repo.one(Domain.Flows.Flow).expires_at
|
|
||||||
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "returns error when trying to delete the last provider", %{
|
test "returns error when trying to delete the last provider", %{
|
||||||
@@ -1882,6 +1892,8 @@ defmodule Domain.AuthTest do
|
|||||||
token_id: deleted_identity_token.id
|
token_id: deleted_identity_token.id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(deleted_identity_flow.id)
|
||||||
|
|
||||||
for n <- 1..4 do
|
for n <- 1..4 do
|
||||||
Fixtures.Auth.create_identity(
|
Fixtures.Auth.create_identity(
|
||||||
account: account,
|
account: account,
|
||||||
@@ -1948,8 +1960,10 @@ defmodule Domain.AuthTest do
|
|||||||
assert deleted_identity_token.deleted_at
|
assert deleted_identity_token.deleted_at
|
||||||
|
|
||||||
# Expires flows for signed out user
|
# Expires flows for signed out user
|
||||||
reloaded_flow = Repo.reload(deleted_identity_flow)
|
flow_id = deleted_identity_flow.id
|
||||||
assert DateTime.compare(reloaded_flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = deleted_identity_flow.client_id
|
||||||
|
resource_id = deleted_identity_flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "circuit breaker prevents mass deletions of identities", %{
|
test "circuit breaker prevents mass deletions of identities", %{
|
||||||
@@ -2783,18 +2797,23 @@ defmodule Domain.AuthTest do
|
|||||||
} do
|
} do
|
||||||
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
client = Fixtures.Clients.create_client(account: account, identity: identity)
|
||||||
|
|
||||||
Fixtures.Flows.create_flow(
|
flow =
|
||||||
account: account,
|
Fixtures.Flows.create_flow(
|
||||||
identity: identity,
|
account: account,
|
||||||
actor: actor,
|
identity: identity,
|
||||||
subject: subject,
|
actor: actor,
|
||||||
client: client
|
subject: subject,
|
||||||
)
|
client: client
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
|
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
assert delete_identities_for(actor, subject) == :ok
|
assert delete_identities_for(actor, subject) == :ok
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
expires_at = Repo.one(Domain.Flows.Flow).expires_at
|
|
||||||
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not remove identities that belong to another actor", %{
|
test "does not remove identities that belong to another actor", %{
|
||||||
|
|||||||
@@ -1092,11 +1092,15 @@ defmodule Domain.ClientsTest do
|
|||||||
subject: subject
|
subject: subject
|
||||||
)
|
)
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
|
|
||||||
assert {:ok, client} = verify_client(client, subject)
|
assert {:ok, client} = verify_client(client, subject)
|
||||||
assert {:ok, _client} = remove_client_verification(client, subject)
|
assert {:ok, _client} = remove_client_verification(client, subject)
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
flow_id = flow.id
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
|
client_id = client.id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "returns error when subject has no permission to verify clients", %{
|
test "returns error when subject has no permission to verify clients", %{
|
||||||
|
|||||||
@@ -1,93 +0,0 @@
|
|||||||
defmodule Domain.Events.Hooks.FlowsTest do
|
|
||||||
use Domain.DataCase, async: true
|
|
||||||
import Domain.Events.Hooks.Flows
|
|
||||||
|
|
||||||
setup do
|
|
||||||
%{old_data: %{}, data: %{}}
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "insert/1" do
|
|
||||||
test "returns :ok", %{data: data} do
|
|
||||||
assert :ok == on_insert(data)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "update/2" do
|
|
||||||
test "broadcasts expire_flow if flow is expired" do
|
|
||||||
flow_id = "flow_123"
|
|
||||||
client_id = "client_123"
|
|
||||||
resource_id = "resource_123"
|
|
||||||
|
|
||||||
old_data = %{}
|
|
||||||
|
|
||||||
data = %{
|
|
||||||
"expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(),
|
|
||||||
"id" => flow_id,
|
|
||||||
"client_id" => client_id,
|
|
||||||
"resource_id" => resource_id
|
|
||||||
}
|
|
||||||
|
|
||||||
:ok = Domain.PubSub.Flow.subscribe(flow_id)
|
|
||||||
|
|
||||||
assert :ok == on_update(old_data, data)
|
|
||||||
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "does not broadcast expire_flow if flow is not expired" do
|
|
||||||
flow_id = "flow_123"
|
|
||||||
client_id = "client_123"
|
|
||||||
resource_id = "resource_123"
|
|
||||||
|
|
||||||
old_data = %{}
|
|
||||||
|
|
||||||
data = %{
|
|
||||||
"expires_at" => DateTime.utc_now() |> DateTime.add(1, :second) |> DateTime.to_iso8601(),
|
|
||||||
"id" => flow_id,
|
|
||||||
"client_id" => client_id,
|
|
||||||
"resource_id" => resource_id
|
|
||||||
}
|
|
||||||
|
|
||||||
:ok = Domain.PubSub.Flow.subscribe(flow_id)
|
|
||||||
|
|
||||||
assert :ok == on_update(old_data, data)
|
|
||||||
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "does not receive broadcast when not subscribed" do
|
|
||||||
flow_id = "flow_123"
|
|
||||||
client_id = "client_123"
|
|
||||||
resource_id = "resource_123"
|
|
||||||
|
|
||||||
old_data = %{}
|
|
||||||
|
|
||||||
data = %{
|
|
||||||
"expires_at" => DateTime.utc_now() |> DateTime.add(-1, :second) |> DateTime.to_iso8601(),
|
|
||||||
"id" => flow_id,
|
|
||||||
"client_id" => client_id,
|
|
||||||
"resource_id" => resource_id
|
|
||||||
}
|
|
||||||
|
|
||||||
assert :ok == on_update(old_data, data)
|
|
||||||
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "delete/1" do
|
|
||||||
test "broadcasts expire_flow" do
|
|
||||||
flow_id = "flow_123"
|
|
||||||
client_id = "client_123"
|
|
||||||
resource_id = "resource_123"
|
|
||||||
|
|
||||||
old_data = %{
|
|
||||||
"id" => flow_id,
|
|
||||||
"client_id" => client_id,
|
|
||||||
"resource_id" => resource_id
|
|
||||||
}
|
|
||||||
|
|
||||||
:ok = Domain.PubSub.Flow.subscribe(flow_id)
|
|
||||||
|
|
||||||
assert :ok == on_delete(old_data)
|
|
||||||
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -57,6 +57,8 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
|
|
||||||
test "disable: broadcasts :disable_policy and :reject_access" do
|
test "disable: broadcasts :disable_policy and :reject_access" do
|
||||||
flow = Fixtures.Flows.create_flow()
|
flow = Fixtures.Flows.create_flow()
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
policy_id = flow.policy_id
|
policy_id = flow.policy_id
|
||||||
account_id = flow.account_id
|
account_id = flow.account_id
|
||||||
actor_group_id = "group-456"
|
actor_group_id = "group-456"
|
||||||
@@ -72,6 +74,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
|
|
||||||
data = Map.put(old_data, "disabled_at", "2023-10-01T00:00:00Z")
|
data = Map.put(old_data, "disabled_at", "2023-10-01T00:00:00Z")
|
||||||
|
|
||||||
|
:ok = PubSub.Flow.subscribe(flow_id)
|
||||||
:ok = PubSub.Policy.subscribe(policy_id)
|
:ok = PubSub.Policy.subscribe(policy_id)
|
||||||
:ok = PubSub.Account.Policies.subscribe(account_id)
|
:ok = PubSub.Account.Policies.subscribe(account_id)
|
||||||
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
|
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
|
||||||
@@ -85,13 +88,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "soft-delete: broadcasts :delete_policy and :reject_access" do
|
test "soft-delete: broadcasts :delete_policy and :reject_access" do
|
||||||
flow = Fixtures.Flows.create_flow()
|
flow = Fixtures.Flows.create_flow()
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
policy_id = flow.policy_id
|
policy_id = flow.policy_id
|
||||||
account_id = flow.account_id
|
account_id = flow.account_id
|
||||||
actor_group_id = "group-456"
|
actor_group_id = "group-456"
|
||||||
@@ -107,6 +110,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
|
|
||||||
data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z")
|
data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z")
|
||||||
|
|
||||||
|
:ok = PubSub.Flow.subscribe(flow_id)
|
||||||
:ok = PubSub.Policy.subscribe(policy_id)
|
:ok = PubSub.Policy.subscribe(policy_id)
|
||||||
:ok = PubSub.Account.Policies.subscribe(account_id)
|
:ok = PubSub.Account.Policies.subscribe(account_id)
|
||||||
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
|
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
|
||||||
@@ -120,13 +124,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "breaking update: broadcasts :delete_policy, :reject_access, :create_policy, :allow_access" do
|
test "breaking update: broadcasts :delete_policy, :reject_access, :create_policy, :allow_access" do
|
||||||
flow = Fixtures.Flows.create_flow()
|
flow = Fixtures.Flows.create_flow()
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
policy_id = flow.policy_id
|
policy_id = flow.policy_id
|
||||||
account_id = flow.account_id
|
account_id = flow.account_id
|
||||||
actor_group_id = "group-456"
|
actor_group_id = "group-456"
|
||||||
@@ -142,12 +146,17 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
|
|
||||||
data = Map.put(old_data, "resource_id", "new-resource-123")
|
data = Map.put(old_data, "resource_id", "new-resource-123")
|
||||||
|
|
||||||
|
:ok = PubSub.Flow.subscribe(flow_id)
|
||||||
:ok = PubSub.Policy.subscribe(policy_id)
|
:ok = PubSub.Policy.subscribe(policy_id)
|
||||||
:ok = PubSub.Account.Policies.subscribe(account_id)
|
:ok = PubSub.Account.Policies.subscribe(account_id)
|
||||||
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
|
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
|
||||||
|
|
||||||
assert :ok == on_update(old_data, data)
|
assert :ok == on_update(old_data, data)
|
||||||
|
|
||||||
|
# TODO: WAL
|
||||||
|
# Remove this when side effects are directly broadcasted
|
||||||
|
Process.sleep(100)
|
||||||
|
|
||||||
assert_receive {:delete_policy, ^policy_id}
|
assert_receive {:delete_policy, ^policy_id}
|
||||||
assert_receive {:delete_policy, ^policy_id}
|
assert_receive {:delete_policy, ^policy_id}
|
||||||
assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id}
|
assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id}
|
||||||
@@ -160,13 +169,13 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "breaking update: disabled policy has no side-effects" do
|
test "breaking update: disabled policy has no side-effects" do
|
||||||
flow = Fixtures.Flows.create_flow()
|
flow = Fixtures.Flows.create_flow()
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
policy_id = flow.policy_id
|
policy_id = flow.policy_id
|
||||||
account_id = flow.account_id
|
account_id = flow.account_id
|
||||||
actor_group_id = "group-456"
|
actor_group_id = "group-456"
|
||||||
@@ -197,9 +206,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
refute_receive {:expire_flow, ^flow_id, ^client_id, "new-resource-123"}
|
||||||
|
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "non-breaking-update: broadcasts :update_policy" do
|
test "non-breaking-update: broadcasts :update_policy" do
|
||||||
@@ -233,6 +240,8 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
describe "delete/1" do
|
describe "delete/1" do
|
||||||
test "broadcasts :delete_policy and :reject_access" do
|
test "broadcasts :delete_policy and :reject_access" do
|
||||||
flow = Fixtures.Flows.create_flow()
|
flow = Fixtures.Flows.create_flow()
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
policy_id = flow.policy_id
|
policy_id = flow.policy_id
|
||||||
account_id = flow.account_id
|
account_id = flow.account_id
|
||||||
actor_group_id = "group-456"
|
actor_group_id = "group-456"
|
||||||
@@ -248,6 +257,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
|
|
||||||
data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z")
|
data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z")
|
||||||
|
|
||||||
|
:ok = PubSub.Flow.subscribe(flow_id)
|
||||||
:ok = PubSub.Policy.subscribe(policy_id)
|
:ok = PubSub.Policy.subscribe(policy_id)
|
||||||
:ok = PubSub.Account.Policies.subscribe(account_id)
|
:ok = PubSub.Account.Policies.subscribe(account_id)
|
||||||
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
|
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
|
||||||
@@ -261,9 +271,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do
|
|||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
|
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -21,19 +21,16 @@ defmodule Domain.Events.Hooks.ResourceConnectionsTest do
|
|||||||
describe "delete/1" do
|
describe "delete/1" do
|
||||||
test "returns :ok" do
|
test "returns :ok" do
|
||||||
flow = Fixtures.Flows.create_flow()
|
flow = Fixtures.Flows.create_flow()
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
|
|
||||||
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :lt
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
assert :ok ==
|
assert :ok ==
|
||||||
on_delete(%{"account_id" => flow.account_id, "resource_id" => flow.resource_id})
|
on_delete(%{"account_id" => flow.account_id, "resource_id" => flow.resource_id})
|
||||||
|
|
||||||
# TODO: WAL
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
# Remove this when flow removal is directly broadcasted
|
|
||||||
Process.sleep(100)
|
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
|
||||||
|
|
||||||
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -68,6 +68,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows when resource type changes", %{flow: flow, old_data: old_data} do
|
test "expires flows when resource type changes", %{flow: flow, old_data: old_data} do
|
||||||
|
:ok = PubSub.Flow.subscribe(flow.id)
|
||||||
:ok = PubSub.Resource.subscribe(flow.resource_id)
|
:ok = PubSub.Resource.subscribe(flow.resource_id)
|
||||||
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
|
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
|
||||||
|
|
||||||
@@ -78,12 +79,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
# TODO: WAL
|
# TODO: WAL
|
||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
flow = Repo.reload(flow)
|
|
||||||
|
|
||||||
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
|
|
||||||
|
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
resource_id = flow.resource_id
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
assert_receive {:delete_resource, ^resource_id}
|
assert_receive {:delete_resource, ^resource_id}
|
||||||
assert_receive {:delete_resource, ^resource_id}
|
assert_receive {:delete_resource, ^resource_id}
|
||||||
assert_receive {:create_resource, ^resource_id}
|
assert_receive {:create_resource, ^resource_id}
|
||||||
@@ -91,6 +92,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows when resource address changes", %{flow: flow, old_data: old_data} do
|
test "expires flows when resource address changes", %{flow: flow, old_data: old_data} do
|
||||||
|
:ok = PubSub.Flow.subscribe(flow.id)
|
||||||
:ok = PubSub.Resource.subscribe(flow.resource_id)
|
:ok = PubSub.Resource.subscribe(flow.resource_id)
|
||||||
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
|
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
|
||||||
|
|
||||||
@@ -101,12 +103,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
# TODO: WAL
|
# TODO: WAL
|
||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
flow = Repo.reload(flow)
|
|
||||||
|
|
||||||
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
|
|
||||||
|
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
resource_id = flow.resource_id
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
assert_receive {:delete_resource, ^resource_id}
|
assert_receive {:delete_resource, ^resource_id}
|
||||||
assert_receive {:delete_resource, ^resource_id}
|
assert_receive {:delete_resource, ^resource_id}
|
||||||
assert_receive {:create_resource, ^resource_id}
|
assert_receive {:create_resource, ^resource_id}
|
||||||
@@ -114,6 +116,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows when resource filters change", %{flow: flow, old_data: old_data} do
|
test "expires flows when resource filters change", %{flow: flow, old_data: old_data} do
|
||||||
|
:ok = PubSub.Flow.subscribe(flow.id)
|
||||||
:ok = PubSub.Resource.subscribe(flow.resource_id)
|
:ok = PubSub.Resource.subscribe(flow.resource_id)
|
||||||
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
|
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
|
||||||
|
|
||||||
@@ -124,12 +127,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
# TODO: WAL
|
# TODO: WAL
|
||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
flow = Repo.reload(flow)
|
|
||||||
|
|
||||||
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
|
|
||||||
|
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
resource_id = flow.resource_id
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
assert_receive {:delete_resource, ^resource_id}
|
assert_receive {:delete_resource, ^resource_id}
|
||||||
assert_receive {:delete_resource, ^resource_id}
|
assert_receive {:delete_resource, ^resource_id}
|
||||||
assert_receive {:create_resource, ^resource_id}
|
assert_receive {:create_resource, ^resource_id}
|
||||||
@@ -137,6 +140,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows when resource ip_stack changes", %{flow: flow, old_data: old_data} do
|
test "expires flows when resource ip_stack changes", %{flow: flow, old_data: old_data} do
|
||||||
|
:ok = PubSub.Flow.subscribe(flow.id)
|
||||||
:ok = PubSub.Resource.subscribe(flow.resource_id)
|
:ok = PubSub.Resource.subscribe(flow.resource_id)
|
||||||
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
|
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
|
||||||
|
|
||||||
@@ -147,12 +151,12 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
# TODO: WAL
|
# TODO: WAL
|
||||||
# Remove this after direct broadcast
|
# Remove this after direct broadcast
|
||||||
Process.sleep(100)
|
Process.sleep(100)
|
||||||
flow = Repo.reload(flow)
|
|
||||||
|
|
||||||
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :gt
|
|
||||||
|
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
resource_id = flow.resource_id
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
assert_receive {:delete_resource, ^resource_id}
|
assert_receive {:delete_resource, ^resource_id}
|
||||||
assert_receive {:delete_resource, ^resource_id}
|
assert_receive {:delete_resource, ^resource_id}
|
||||||
assert_receive {:create_resource, ^resource_id}
|
assert_receive {:create_resource, ^resource_id}
|
||||||
@@ -167,10 +171,15 @@ defmodule Domain.Events.Hooks.ResourcesTest do
|
|||||||
|
|
||||||
assert :ok == on_update(old_data, data)
|
assert :ok == on_update(old_data, data)
|
||||||
|
|
||||||
assert DateTime.compare(DateTime.utc_now(), flow.expires_at) == :lt
|
# TODO: WAL
|
||||||
|
# Remove this after direct broadcast
|
||||||
|
Process.sleep(100)
|
||||||
|
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
resource_id = flow.resource_id
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
|
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
assert_receive {:update_resource, ^resource_id}
|
assert_receive {:update_resource, ^resource_id}
|
||||||
assert_receive {:update_resource, ^resource_id}
|
assert_receive {:update_resource, ^resource_id}
|
||||||
refute_receive {:delete_resource, ^resource_id}
|
refute_receive {:delete_resource, ^resource_id}
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ defmodule Domain.FlowsTest do
|
|||||||
policy: policy,
|
policy: policy,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
assert {:ok, fetched_resource, _flow} =
|
assert {:ok, fetched_resource, _flow, _expires_at} =
|
||||||
authorize_flow(client, gateway, resource.id, subject)
|
authorize_flow(client, gateway, resource.id, subject)
|
||||||
|
|
||||||
assert fetched_resource.id == resource.id
|
assert fetched_resource.id == resource.id
|
||||||
@@ -198,11 +198,11 @@ defmodule Domain.FlowsTest do
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
assert {:ok, _fetched_resource, flow} =
|
assert {:ok, _fetched_resource, flow, expires_at} =
|
||||||
authorize_flow(client, gateway, resource.id, subject)
|
authorize_flow(client, gateway, resource.id, subject)
|
||||||
|
|
||||||
assert flow.policy_id == policy.id
|
assert flow.policy_id == policy.id
|
||||||
assert DateTime.diff(flow.expires_at, DateTime.new!(date, midnight)) < 5
|
assert DateTime.diff(expires_at, DateTime.new!(date, midnight)) < 5
|
||||||
end
|
end
|
||||||
|
|
||||||
test "creates a flow when all conditions for at least one of the policies are satisfied", %{
|
test "creates a flow when all conditions for at least one of the policies are satisfied", %{
|
||||||
@@ -237,10 +237,11 @@ defmodule Domain.FlowsTest do
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
assert {:ok, _fetched_resource, flow} =
|
assert {:ok, _fetched_resource, flow, expires_at} =
|
||||||
authorize_flow(client, gateway, resource.id, subject)
|
authorize_flow(client, gateway, resource.id, subject)
|
||||||
|
|
||||||
assert flow.expires_at == subject.expires_at
|
assert flow.resource_id == resource.id
|
||||||
|
assert expires_at == subject.expires_at
|
||||||
end
|
end
|
||||||
|
|
||||||
test "creates a network flow for users", %{
|
test "creates a network flow for users", %{
|
||||||
@@ -256,7 +257,7 @@ defmodule Domain.FlowsTest do
|
|||||||
|
|
||||||
Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group)
|
Fixtures.Actors.create_membership(account: account, actor: actor, group: actor_group)
|
||||||
|
|
||||||
assert {:ok, _fetched_resource, %Flows.Flow{} = flow} =
|
assert {:ok, _fetched_resource, %Flows.Flow{} = flow, expires_at} =
|
||||||
authorize_flow(client, gateway, resource.id, subject)
|
authorize_flow(client, gateway, resource.id, subject)
|
||||||
|
|
||||||
assert flow.policy_id == policy.id
|
assert flow.policy_id == policy.id
|
||||||
@@ -267,7 +268,7 @@ defmodule Domain.FlowsTest do
|
|||||||
assert flow.client_remote_ip.address == subject.context.remote_ip
|
assert flow.client_remote_ip.address == subject.context.remote_ip
|
||||||
assert flow.client_user_agent == subject.context.user_agent
|
assert flow.client_user_agent == subject.context.user_agent
|
||||||
assert flow.gateway_remote_ip == gateway.last_seen_remote_ip
|
assert flow.gateway_remote_ip == gateway.last_seen_remote_ip
|
||||||
assert flow.expires_at == subject.expires_at
|
assert expires_at == subject.expires_at
|
||||||
end
|
end
|
||||||
|
|
||||||
test "creates a network flow for service accounts", %{
|
test "creates a network flow for service accounts", %{
|
||||||
@@ -285,7 +286,7 @@ defmodule Domain.FlowsTest do
|
|||||||
|
|
||||||
client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity)
|
client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity)
|
||||||
|
|
||||||
assert {:ok, _fetched_resource, %Flows.Flow{} = flow} =
|
assert {:ok, _fetched_resource, %Flows.Flow{} = flow, expires_at} =
|
||||||
authorize_flow(client, gateway, resource.id, subject)
|
authorize_flow(client, gateway, resource.id, subject)
|
||||||
|
|
||||||
assert flow.policy_id == policy.id
|
assert flow.policy_id == policy.id
|
||||||
@@ -296,7 +297,7 @@ defmodule Domain.FlowsTest do
|
|||||||
assert flow.client_remote_ip.address == subject.context.remote_ip
|
assert flow.client_remote_ip.address == subject.context.remote_ip
|
||||||
assert flow.client_user_agent == subject.context.user_agent
|
assert flow.client_user_agent == subject.context.user_agent
|
||||||
assert flow.gateway_remote_ip == gateway.last_seen_remote_ip
|
assert flow.gateway_remote_ip == gateway.last_seen_remote_ip
|
||||||
assert flow.expires_at == subject.expires_at
|
assert expires_at == subject.expires_at
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not return authorized access to deleted resources", %{
|
test "does not return authorized access to deleted resources", %{
|
||||||
@@ -375,7 +376,7 @@ defmodule Domain.FlowsTest do
|
|||||||
resource: resource,
|
resource: resource,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
assert {:ok, resource, _flow} =
|
assert {:ok, resource, _flow, _expires_at} =
|
||||||
authorize_flow(client, gateway, resource.id, subject, preload: :connections)
|
authorize_flow(client, gateway, resource.id, subject, preload: :connections)
|
||||||
|
|
||||||
assert Ecto.assoc_loaded?(resource.connections)
|
assert Ecto.assoc_loaded?(resource.connections)
|
||||||
@@ -929,27 +930,36 @@ defmodule Domain.FlowsTest do
|
|||||||
flow: flow,
|
flow: flow,
|
||||||
actor_group: actor_group
|
actor_group: actor_group
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for(actor_group)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for(actor_group)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows for client identity", %{
|
test "expires flows for client identity", %{
|
||||||
flow: flow,
|
flow: flow,
|
||||||
identity: identity
|
identity: identity
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for(identity)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for(identity)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows for client", %{
|
test "expires flows for client", %{
|
||||||
flow: flow,
|
flow: flow,
|
||||||
client: client
|
client: client
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for(client)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for(client)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -982,11 +992,13 @@ defmodule Domain.FlowsTest do
|
|||||||
actor: actor,
|
actor: actor,
|
||||||
policy: policy
|
policy: policy
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} =
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
expire_flows_for(actor.account_id, actor.id, policy.actor_group_id)
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
assert :ok = expire_flows_for(actor.account_id, actor.id, policy.actor_group_id)
|
||||||
assert expired_flow.id == flow.id
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows for actor", %{
|
test "expires flows for actor", %{
|
||||||
@@ -994,18 +1006,24 @@ defmodule Domain.FlowsTest do
|
|||||||
actor: actor,
|
actor: actor,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for(actor, subject)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for(actor, subject)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows for policy", %{
|
test "expires flows for policy", %{
|
||||||
flow: flow,
|
flow: flow,
|
||||||
policy: policy
|
policy: policy
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for_policy_id(policy.account_id, policy.id)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for_policy_id(policy.account_id, policy.id)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows for resource", %{
|
test "expires flows for resource", %{
|
||||||
@@ -1013,9 +1031,12 @@ defmodule Domain.FlowsTest do
|
|||||||
resource: resource,
|
resource: resource,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for(resource, subject)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for(resource, subject)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows for policy actor group", %{
|
test "expires flows for policy actor group", %{
|
||||||
@@ -1023,9 +1044,12 @@ defmodule Domain.FlowsTest do
|
|||||||
actor_group: actor_group,
|
actor_group: actor_group,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for(actor_group, subject)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for(actor_group, subject)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows for client identity", %{
|
test "expires flows for client identity", %{
|
||||||
@@ -1033,9 +1057,12 @@ defmodule Domain.FlowsTest do
|
|||||||
identity: identity,
|
identity: identity,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for(identity, subject)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for(identity, subject)
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "expires flows for client identity provider", %{
|
test "expires flows for client identity provider", %{
|
||||||
@@ -1043,20 +1070,12 @@ defmodule Domain.FlowsTest do
|
|||||||
provider: provider,
|
provider: provider,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
assert {:ok, [expired_flow]} = expire_flows_for(provider, subject)
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1
|
flow_id = flow.id
|
||||||
assert expired_flow.id == flow.id
|
client_id = flow.client_id
|
||||||
end
|
resource_id = flow.resource_id
|
||||||
|
assert :ok = expire_flows_for(provider, subject)
|
||||||
test "updates flow expiration expires_at", %{
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
flow: flow,
|
|
||||||
actor: actor,
|
|
||||||
subject: subject
|
|
||||||
} do
|
|
||||||
assert {:ok, [_expired_flow]} = expire_flows_for(actor, subject)
|
|
||||||
|
|
||||||
flow = Repo.reload(flow)
|
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "returns error when subject has no permission to expire flows", %{
|
test "returns error when subject has no permission to expire flows", %{
|
||||||
@@ -1077,16 +1096,16 @@ defmodule Domain.FlowsTest do
|
|||||||
actor_group: actor_group,
|
actor_group: actor_group,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
assert {:ok, [_expired_flow]} = expire_flows_for(resource, subject)
|
assert :ok = expire_flows_for(resource, subject)
|
||||||
assert {:ok, []} = expire_flows_for(actor_group, subject)
|
assert :ok = expire_flows_for(actor_group, subject)
|
||||||
assert {:ok, []} = expire_flows_for(resource, subject)
|
assert :ok = expire_flows_for(resource, subject)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not expire flows outside of account", %{
|
test "does not expire flows outside of account", %{
|
||||||
resource: resource
|
resource: resource
|
||||||
} do
|
} do
|
||||||
subject = Fixtures.Auth.create_subject()
|
subject = Fixtures.Auth.create_subject()
|
||||||
assert {:ok, []} = expire_flows_for(resource, subject)
|
assert :ok = expire_flows_for(resource, subject)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1531,12 +1531,15 @@ defmodule Domain.ResourcesTest do
|
|||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
|
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow_id)
|
||||||
|
|
||||||
attrs = %{"name" => "foo"}
|
attrs = %{"name" => "foo"}
|
||||||
assert {:ok, _resource} = update_resource(resource, attrs, subject)
|
assert {:ok, _resource} = update_resource(resource, attrs, subject)
|
||||||
|
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
flow = Repo.reload(flow)
|
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "allows to update connections", %{account: account, resource: resource, subject: subject} do
|
test "allows to update connections", %{account: account, resource: resource, subject: subject} do
|
||||||
@@ -1551,6 +1554,11 @@ defmodule Domain.ResourcesTest do
|
|||||||
gateway2 = Fixtures.Gateways.create_gateway(account: account)
|
gateway2 = Fixtures.Gateways.create_gateway(account: account)
|
||||||
|
|
||||||
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
|
flow = Fixtures.Flows.create_flow(account: account, resource: resource, subject: subject)
|
||||||
|
flow_id = flow.id
|
||||||
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow_id)
|
||||||
|
|
||||||
attrs = %{
|
attrs = %{
|
||||||
"connections" => [
|
"connections" => [
|
||||||
@@ -1575,11 +1583,7 @@ defmodule Domain.ResourcesTest do
|
|||||||
"resource_id" => resource.id
|
"resource_id" => resource.id
|
||||||
})
|
})
|
||||||
|
|
||||||
# TODO: WAL
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
# Remove this after direct broadcast
|
|
||||||
Process.sleep(100)
|
|
||||||
flow = Repo.reload(flow)
|
|
||||||
assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not allow to remove all connections", %{resource: resource, subject: subject} do
|
test "does not allow to remove all connections", %{resource: resource, subject: subject} do
|
||||||
|
|||||||
@@ -511,16 +511,22 @@ defmodule Domain.TokensTest do
|
|||||||
identity: identity,
|
identity: identity,
|
||||||
subject: subject
|
subject: subject
|
||||||
} do
|
} do
|
||||||
Fixtures.Flows.create_flow(
|
flow =
|
||||||
account: account,
|
Fixtures.Flows.create_flow(
|
||||||
identity: identity,
|
account: account,
|
||||||
subject: subject
|
identity: identity,
|
||||||
)
|
subject: subject
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok = Domain.PubSub.Flow.subscribe(flow.id)
|
||||||
|
|
||||||
assert {:ok, _token} = delete_token_for(subject)
|
assert {:ok, _token} = delete_token_for(subject)
|
||||||
|
|
||||||
expires_at = Repo.one(Domain.Flows.Flow).expires_at
|
flow_id = flow.id
|
||||||
assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1
|
client_id = flow.client_id
|
||||||
|
resource_id = flow.resource_id
|
||||||
|
|
||||||
|
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not delete tokens for other actors", %{account: account, subject: subject} do
|
test "does not delete tokens for other actors", %{account: account, subject: subject} do
|
||||||
|
|||||||
@@ -82,8 +82,7 @@ defmodule Domain.Fixtures.Flows do
|
|||||||
account_id: account.id,
|
account_id: account.id,
|
||||||
client_remote_ip: client.last_seen_remote_ip,
|
client_remote_ip: client.last_seen_remote_ip,
|
||||||
client_user_agent: client.last_seen_user_agent,
|
client_user_agent: client.last_seen_user_agent,
|
||||||
gateway_remote_ip: gateway.last_seen_remote_ip,
|
gateway_remote_ip: gateway.last_seen_remote_ip
|
||||||
expires_at: subject.expires_at
|
|
||||||
})
|
})
|
||||||
|> Repo.insert!()
|
|> Repo.insert!()
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -562,11 +562,6 @@ defmodule Web.Actors.Show do
|
|||||||
<br />
|
<br />
|
||||||
<code class="text-xs">{flow.gateway_remote_ip}</code>
|
<code class="text-xs">{flow.gateway_remote_ip}</code>
|
||||||
</:col>
|
</:col>
|
||||||
<:col :let={flow} :if={@flow_activities_enabled?} label="activity" class="w-1/12">
|
|
||||||
<.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}>
|
|
||||||
Show
|
|
||||||
</.link>
|
|
||||||
</:col>
|
|
||||||
<:empty>
|
<:empty>
|
||||||
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
|
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
|
||||||
</:empty>
|
</:empty>
|
||||||
|
|||||||
@@ -319,11 +319,6 @@ defmodule Web.Clients.Show do
|
|||||||
<br />
|
<br />
|
||||||
<code class="text-xs">{flow.gateway_remote_ip}</code>
|
<code class="text-xs">{flow.gateway_remote_ip}</code>
|
||||||
</:col>
|
</:col>
|
||||||
<:col :let={flow} :if={@flow_activities_enabled?} label="activity">
|
|
||||||
<.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}>
|
|
||||||
Show
|
|
||||||
</.link>
|
|
||||||
</:col>
|
|
||||||
<:empty>
|
<:empty>
|
||||||
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
|
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
|
||||||
</:empty>
|
</:empty>
|
||||||
|
|||||||
@@ -1,75 +0,0 @@
|
|||||||
defmodule Web.Flows.DownloadActivities do
|
|
||||||
use Web, :controller
|
|
||||||
alias Domain.Flows
|
|
||||||
|
|
||||||
def download(conn, %{"id" => id}) do
|
|
||||||
with {:ok, flow} <- Flows.fetch_flow_by_id(id, conn.assigns.subject) do
|
|
||||||
conn
|
|
||||||
|> put_resp_content_type("text/csv")
|
|
||||||
|> put_resp_header(
|
|
||||||
"content-disposition",
|
|
||||||
"attachment; filename=\"flow-activities-#{flow.id}.csv\""
|
|
||||||
)
|
|
||||||
|> put_root_layout(false)
|
|
||||||
|> send_chunked(200)
|
|
||||||
|> send_csv_header()
|
|
||||||
|> stream_csv_body(flow)
|
|
||||||
else
|
|
||||||
{:error, _reason} -> raise Web.LiveErrors.NotFoundError
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp send_csv_header(conn) do
|
|
||||||
iodata =
|
|
||||||
Web.CSV.dump_to_iodata([~w[
|
|
||||||
window_started_at window_ended_at
|
|
||||||
destination
|
|
||||||
connectivity_type
|
|
||||||
rx_bytes tx_bytes blocked_tx_bytes
|
|
||||||
]])
|
|
||||||
|
|
||||||
{:ok, conn} = chunk(conn, iodata)
|
|
||||||
conn
|
|
||||||
end
|
|
||||||
|
|
||||||
defp stream_csv_body(conn, flow, cursor \\ nil) do
|
|
||||||
with {:ok, activities, activities_metadata} <-
|
|
||||||
Flows.list_flow_activities_for(
|
|
||||||
flow,
|
|
||||||
conn.assigns.subject,
|
|
||||||
page: [cursor: cursor, limit: 100]
|
|
||||||
),
|
|
||||||
{:ok, conn} <- stream_csv_rows(conn, activities) do
|
|
||||||
if activities_metadata.next_page_cursor do
|
|
||||||
stream_csv_body(conn, flow, activities_metadata.next_page_cursor)
|
|
||||||
else
|
|
||||||
conn
|
|
||||||
end
|
|
||||||
else
|
|
||||||
{:error, _reason} ->
|
|
||||||
conn
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp stream_csv_rows(conn, activities) do
|
|
||||||
iodata =
|
|
||||||
activities
|
|
||||||
|> Enum.map(fn activity ->
|
|
||||||
[
|
|
||||||
to_string(activity.window_started_at),
|
|
||||||
to_string(activity.window_ended_at),
|
|
||||||
to_string(activity.destination),
|
|
||||||
to_string(activity.connectivity_type),
|
|
||||||
activity.rx_bytes,
|
|
||||||
activity.tx_bytes,
|
|
||||||
activity.blocked_tx_bytes
|
|
||||||
]
|
|
||||||
end)
|
|
||||||
|> Web.CSV.dump_to_iodata()
|
|
||||||
|
|
||||||
case chunk(conn, iodata) do
|
|
||||||
{:ok, conn} -> {:ok, conn}
|
|
||||||
{:error, reason} -> {:error, reason}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -1,190 +0,0 @@
|
|||||||
defmodule Web.Flows.Show do
|
|
||||||
use Web, :live_view
|
|
||||||
import Web.Policies.Components
|
|
||||||
alias Domain.{Accounts, Flows}
|
|
||||||
|
|
||||||
def mount(%{"id" => id}, _session, socket) do
|
|
||||||
with true <- Accounts.flow_activities_enabled?(socket.assigns.account),
|
|
||||||
{:ok, flow} <-
|
|
||||||
Flows.fetch_flow_by_id(id, socket.assigns.subject,
|
|
||||||
preload: [
|
|
||||||
policy: [:resource, :actor_group],
|
|
||||||
client: [],
|
|
||||||
gateway: [:group],
|
|
||||||
resource: []
|
|
||||||
]
|
|
||||||
) do
|
|
||||||
last_used_connectivity_type = get_last_used_connectivity_type(flow, socket.assigns.subject)
|
|
||||||
|
|
||||||
socket =
|
|
||||||
socket
|
|
||||||
|> assign(
|
|
||||||
page_title: "Flow #{flow.id}",
|
|
||||||
flow: flow,
|
|
||||||
last_used_connectivity_type: last_used_connectivity_type
|
|
||||||
)
|
|
||||||
|> assign_live_table("activities",
|
|
||||||
query_module: Flows.Activity.Query,
|
|
||||||
sortable_fields: [],
|
|
||||||
callback: &handle_activities_update!/2
|
|
||||||
)
|
|
||||||
|
|
||||||
{:ok, socket}
|
|
||||||
else
|
|
||||||
_other -> raise Web.LiveErrors.NotFoundError
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp get_last_used_connectivity_type(flow, subject) do
|
|
||||||
case Flows.fetch_last_activity_for(flow, subject) do
|
|
||||||
{:ok, activity} -> to_string(activity.connectivity_type)
|
|
||||||
_other -> "N/A"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_params(params, uri, socket) do
|
|
||||||
socket = handle_live_tables_params(socket, params, uri)
|
|
||||||
{:noreply, socket}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_activities_update!(socket, list_opts) do
|
|
||||||
with {:ok, activities, metadata} <-
|
|
||||||
Flows.list_flow_activities_for(socket.assigns.flow, socket.assigns.subject, list_opts) do
|
|
||||||
{:ok,
|
|
||||||
assign(socket,
|
|
||||||
activities: activities,
|
|
||||||
activities_metadata: metadata
|
|
||||||
)}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def render(assigns) do
|
|
||||||
~H"""
|
|
||||||
<.breadcrumbs account={@account}>
|
|
||||||
<.breadcrumb>Flows</.breadcrumb>
|
|
||||||
<.breadcrumb path={~p"/#{@account}/flows/#{@flow.id}"}>
|
|
||||||
{@flow.client.name} flow
|
|
||||||
</.breadcrumb>
|
|
||||||
</.breadcrumbs>
|
|
||||||
|
|
||||||
<.section>
|
|
||||||
<:title>
|
|
||||||
Flow for: <code>{@flow.client.name}</code>
|
|
||||||
</:title>
|
|
||||||
<:action>
|
|
||||||
<.button
|
|
||||||
navigate={~p"/#{@account}/flows/#{@flow}/activities.csv"}
|
|
||||||
icon="hero-arrow-down-on-square"
|
|
||||||
>
|
|
||||||
Export to CSV
|
|
||||||
</.button>
|
|
||||||
</:action>
|
|
||||||
<:content flash={@flash}>
|
|
||||||
<.vertical_table id="flow">
|
|
||||||
<.vertical_table_row>
|
|
||||||
<:label>Authorized At</:label>
|
|
||||||
<:value>
|
|
||||||
<.relative_datetime datetime={@flow.inserted_at} />
|
|
||||||
</:value>
|
|
||||||
</.vertical_table_row>
|
|
||||||
<.vertical_table_row>
|
|
||||||
<:label>Expires At</:label>
|
|
||||||
<:value>
|
|
||||||
<.relative_datetime datetime={@flow.expires_at} />
|
|
||||||
</:value>
|
|
||||||
</.vertical_table_row>
|
|
||||||
<.vertical_table_row>
|
|
||||||
<:label>Policy</:label>
|
|
||||||
<:value>
|
|
||||||
<.link navigate={~p"/#{@account}/policies/#{@flow.policy_id}"} class={link_style()}>
|
|
||||||
<.policy_name policy={@flow.policy} />
|
|
||||||
</.link>
|
|
||||||
</:value>
|
|
||||||
</.vertical_table_row>
|
|
||||||
<.vertical_table_row>
|
|
||||||
<:label>Client</:label>
|
|
||||||
<:value>
|
|
||||||
<.link navigate={~p"/#{@account}/clients/#{@flow.client_id}"} class={link_style()}>
|
|
||||||
{@flow.client.name}
|
|
||||||
</.link>
|
|
||||||
<div>Remote IP: {@flow.client_remote_ip}</div>
|
|
||||||
<div>User Agent: {@flow.client_user_agent}</div>
|
|
||||||
</:value>
|
|
||||||
</.vertical_table_row>
|
|
||||||
<.vertical_table_row>
|
|
||||||
<:label>Gateway</:label>
|
|
||||||
<:value>
|
|
||||||
<.link navigate={~p"/#{@account}/gateways/#{@flow.gateway_id}"} class={link_style()}>
|
|
||||||
{@flow.gateway.group.name}-{@flow.gateway.name}
|
|
||||||
</.link>
|
|
||||||
<div>
|
|
||||||
Remote IP: {@flow.gateway_remote_ip}
|
|
||||||
</div>
|
|
||||||
</:value>
|
|
||||||
</.vertical_table_row>
|
|
||||||
<.vertical_table_row>
|
|
||||||
<:label>Resource</:label>
|
|
||||||
<:value>
|
|
||||||
<.link navigate={~p"/#{@account}/resources/#{@flow.resource_id}"} class={link_style()}>
|
|
||||||
{@flow.resource.name}
|
|
||||||
</.link>
|
|
||||||
</:value>
|
|
||||||
</.vertical_table_row>
|
|
||||||
<.vertical_table_row>
|
|
||||||
<:label>Connectivity Type</:label>
|
|
||||||
<:value>
|
|
||||||
{@last_used_connectivity_type}
|
|
||||||
</:value>
|
|
||||||
</.vertical_table_row>
|
|
||||||
</.vertical_table>
|
|
||||||
</:content>
|
|
||||||
</.section>
|
|
||||||
|
|
||||||
<.section>
|
|
||||||
<:title>Metrics</:title>
|
|
||||||
<:help>
|
|
||||||
Pre-aggregated metrics for this flow.
|
|
||||||
</:help>
|
|
||||||
<:content>
|
|
||||||
<.live_table
|
|
||||||
id="activities"
|
|
||||||
rows={@activities}
|
|
||||||
row_id={&"activities-#{&1.id}"}
|
|
||||||
filters={@filters_by_table_id["activities"]}
|
|
||||||
filter={@filter_form_by_table_id["activities"]}
|
|
||||||
ordered_by={@order_by_table_id["activities"]}
|
|
||||||
metadata={@activities_metadata}
|
|
||||||
>
|
|
||||||
<:col :let={activity} label="started">
|
|
||||||
<.relative_datetime datetime={activity.window_started_at} />
|
|
||||||
</:col>
|
|
||||||
<:col :let={activity} label="ended">
|
|
||||||
<.relative_datetime datetime={activity.window_ended_at} />
|
|
||||||
</:col>
|
|
||||||
<:col :let={activity} label="destination">
|
|
||||||
{activity.destination}
|
|
||||||
</:col>
|
|
||||||
<:col :let={activity} label="connectivity type">
|
|
||||||
{activity.connectivity_type}
|
|
||||||
</:col>
|
|
||||||
<:col :let={activity} label="rx">
|
|
||||||
{Sizeable.filesize(activity.rx_bytes)}
|
|
||||||
</:col>
|
|
||||||
<:col :let={activity} label="tx">
|
|
||||||
{Sizeable.filesize(activity.tx_bytes)}
|
|
||||||
</:col>
|
|
||||||
<:col :let={activity} label="blocked tx">
|
|
||||||
{Sizeable.filesize(activity.blocked_tx_bytes)}
|
|
||||||
</:col>
|
|
||||||
<:empty>
|
|
||||||
<div class="text-center text-neutral-500 p-4">No metrics to display.</div>
|
|
||||||
</:empty>
|
|
||||||
</.live_table>
|
|
||||||
</:content>
|
|
||||||
</.section>
|
|
||||||
"""
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_event(event, params, socket) when event in ["paginate", "order_by", "filter"],
|
|
||||||
do: handle_live_table_event(event, params, socket)
|
|
||||||
end
|
|
||||||
@@ -270,11 +270,6 @@ defmodule Web.Policies.Show do
|
|||||||
<br />
|
<br />
|
||||||
<code class="text-xs">{flow.gateway_remote_ip}</code>
|
<code class="text-xs">{flow.gateway_remote_ip}</code>
|
||||||
</:col>
|
</:col>
|
||||||
<:col :let={flow} :if={@flow_activities_enabled?} label="activity">
|
|
||||||
<.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={link_style()}>
|
|
||||||
Show
|
|
||||||
</.link>
|
|
||||||
</:col>
|
|
||||||
<:empty>
|
<:empty>
|
||||||
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
|
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
|
||||||
</:empty>
|
</:empty>
|
||||||
|
|||||||
@@ -366,11 +366,6 @@ defmodule Web.Resources.Show do
|
|||||||
<br />
|
<br />
|
||||||
<code class="text-xs">{flow.gateway_remote_ip}</code>
|
<code class="text-xs">{flow.gateway_remote_ip}</code>
|
||||||
</:col>
|
</:col>
|
||||||
<:col :let={flow} :if={@flow_activities_enabled?} label="activity">
|
|
||||||
<.link navigate={~p"/#{@account}/flows/#{flow.id}"} class={[link_style()]}>
|
|
||||||
Show
|
|
||||||
</.link>
|
|
||||||
</:col>
|
|
||||||
<:empty>
|
<:empty>
|
||||||
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
|
<div class="text-center text-neutral-500 p-4">No activity to display.</div>
|
||||||
</:empty>
|
</:empty>
|
||||||
|
|||||||
@@ -207,11 +207,6 @@ defmodule Web.Router do
|
|||||||
live "/:id", Show
|
live "/:id", Show
|
||||||
end
|
end
|
||||||
|
|
||||||
scope "/flows", Flows do
|
|
||||||
live "/:id", Show
|
|
||||||
get "/:id/activities.csv", DownloadActivities, :download
|
|
||||||
end
|
|
||||||
|
|
||||||
scope "/settings", Settings do
|
scope "/settings", Settings do
|
||||||
scope "/account" do
|
scope "/account" do
|
||||||
live "/", Account
|
live "/", Account
|
||||||
|
|||||||
@@ -1,224 +0,0 @@
|
|||||||
defmodule Web.Live.Flows.ShowTest do
|
|
||||||
use Web.ConnCase, async: true
|
|
||||||
|
|
||||||
setup do
|
|
||||||
account = Fixtures.Accounts.create_account()
|
|
||||||
actor = Fixtures.Actors.create_actor(type: :account_admin_user, account: account)
|
|
||||||
identity = Fixtures.Auth.create_identity(account: account, actor: actor)
|
|
||||||
subject = Fixtures.Auth.create_subject(account: account, actor: actor, identity: identity)
|
|
||||||
|
|
||||||
client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity)
|
|
||||||
flow = Fixtures.Flows.create_flow(account: account, client: client)
|
|
||||||
|
|
||||||
%{
|
|
||||||
account: account,
|
|
||||||
actor: actor,
|
|
||||||
identity: identity,
|
|
||||||
subject: subject,
|
|
||||||
client: client,
|
|
||||||
flow: flow
|
|
||||||
}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "redirects to sign in page for unauthorized user", %{
|
|
||||||
account: account,
|
|
||||||
flow: flow,
|
|
||||||
conn: conn
|
|
||||||
} do
|
|
||||||
path = ~p"/#{account}/flows/#{flow}"
|
|
||||||
|
|
||||||
assert live(conn, path) ==
|
|
||||||
{:error,
|
|
||||||
{:redirect,
|
|
||||||
%{
|
|
||||||
to: ~p"/#{account}?#{%{redirect_to: path}}",
|
|
||||||
flash: %{"error" => "You must sign in to access this page."}
|
|
||||||
}}}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "renders 404 error when flow activities are not enabled", %{
|
|
||||||
account: account,
|
|
||||||
identity: identity,
|
|
||||||
flow: flow,
|
|
||||||
conn: conn
|
|
||||||
} do
|
|
||||||
{:ok, account} =
|
|
||||||
Domain.Accounts.update_account(account, %{
|
|
||||||
features: %{
|
|
||||||
flow_activities: false
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
assert_raise Web.LiveErrors.NotFoundError, fn ->
|
|
||||||
conn
|
|
||||||
|> authorize_conn(identity)
|
|
||||||
|> live(~p"/#{account}/flows/#{flow}") =~ "404"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
test "renders breadcrumbs item", %{
|
|
||||||
account: account,
|
|
||||||
flow: flow,
|
|
||||||
client: client,
|
|
||||||
identity: identity,
|
|
||||||
conn: conn
|
|
||||||
} do
|
|
||||||
{:ok, _lv, html} =
|
|
||||||
conn
|
|
||||||
|> authorize_conn(identity)
|
|
||||||
|> live(~p"/#{account}/flows/#{flow}")
|
|
||||||
|
|
||||||
assert item = Floki.find(html, "[aria-label='Breadcrumb']")
|
|
||||||
breadcrumbs = String.trim(Floki.text(item))
|
|
||||||
assert breadcrumbs =~ "Flows"
|
|
||||||
assert breadcrumbs =~ "#{client.name} flow"
|
|
||||||
end
|
|
||||||
|
|
||||||
test "renders flows details", %{
|
|
||||||
account: account,
|
|
||||||
identity: identity,
|
|
||||||
flow: flow,
|
|
||||||
conn: conn
|
|
||||||
} do
|
|
||||||
flow =
|
|
||||||
Repo.preload(flow,
|
|
||||||
policy: [:resource, :actor_group],
|
|
||||||
client: [],
|
|
||||||
gateway: [:group],
|
|
||||||
resource: []
|
|
||||||
)
|
|
||||||
|
|
||||||
activity =
|
|
||||||
Fixtures.Flows.create_activity(
|
|
||||||
account: account,
|
|
||||||
flow: flow,
|
|
||||||
window_started_at: DateTime.truncate(flow.inserted_at, :second),
|
|
||||||
window_ended_at: DateTime.truncate(flow.expires_at, :second)
|
|
||||||
)
|
|
||||||
|
|
||||||
{:ok, lv, _html} =
|
|
||||||
conn
|
|
||||||
|> authorize_conn(identity)
|
|
||||||
|> live(~p"/#{account}/flows/#{flow}")
|
|
||||||
|
|
||||||
table =
|
|
||||||
lv
|
|
||||||
|> element("#flow")
|
|
||||||
|> render()
|
|
||||||
|> vertical_table_to_map()
|
|
||||||
|
|
||||||
assert table["authorized at"]
|
|
||||||
assert table["expires at"]
|
|
||||||
|
|
||||||
assert table["connectivity type"] =~ to_string(activity.connectivity_type)
|
|
||||||
|
|
||||||
assert table["client"] =~ flow.client.name
|
|
||||||
assert table["client"] =~ to_string(flow.client_remote_ip)
|
|
||||||
assert table["client"] =~ flow.client_user_agent
|
|
||||||
|
|
||||||
assert table["gateway"] =~ flow.gateway.name
|
|
||||||
assert table["gateway"] =~ to_string(flow.gateway_remote_ip)
|
|
||||||
|
|
||||||
assert table["resource"] =~ flow.resource.name
|
|
||||||
|
|
||||||
assert table["policy"] =~ flow.policy.resource.name
|
|
||||||
assert table["policy"] =~ flow.policy.actor_group.name
|
|
||||||
end
|
|
||||||
|
|
||||||
test "allows downloading activities", %{
|
|
||||||
account: account,
|
|
||||||
flow: flow,
|
|
||||||
identity: identity,
|
|
||||||
conn: conn
|
|
||||||
} do
|
|
||||||
activity =
|
|
||||||
Fixtures.Flows.create_activity(
|
|
||||||
account: account,
|
|
||||||
flow: flow,
|
|
||||||
window_started_at: DateTime.truncate(flow.inserted_at, :second),
|
|
||||||
window_ended_at: DateTime.truncate(flow.expires_at, :second)
|
|
||||||
)
|
|
||||||
|
|
||||||
{:ok, lv, _html} =
|
|
||||||
conn
|
|
||||||
|> authorize_conn(identity)
|
|
||||||
|> live(~p"/#{account}/flows/#{flow}")
|
|
||||||
|
|
||||||
lv
|
|
||||||
|> element("a", "Export to CSV")
|
|
||||||
|> render_click()
|
|
||||||
|
|
||||||
assert_redirected(lv, ~p"/#{account}/flows/#{flow}/activities.csv")
|
|
||||||
|
|
||||||
controller_conn = get(conn, ~p"/#{account}/flows/#{flow}/activities.csv")
|
|
||||||
assert redirected_to(controller_conn) =~ ~p"/#{account}"
|
|
||||||
assert flash(controller_conn, :error) == "You must sign in to access this page."
|
|
||||||
|
|
||||||
controller_conn =
|
|
||||||
conn
|
|
||||||
|> authorize_conn(identity)
|
|
||||||
|> get(~p"/#{account}/flows/#{flow}/activities.csv")
|
|
||||||
|
|
||||||
assert response = response(controller_conn, 200)
|
|
||||||
|
|
||||||
assert response
|
|
||||||
|> String.trim()
|
|
||||||
|> String.split("\n")
|
|
||||||
|> Enum.map(&String.split(&1, "\t")) ==
|
|
||||||
[
|
|
||||||
[
|
|
||||||
"window_started_at",
|
|
||||||
"window_ended_at",
|
|
||||||
"destination",
|
|
||||||
"connectivity_type",
|
|
||||||
"rx_bytes",
|
|
||||||
"tx_bytes",
|
|
||||||
"blocked_tx_bytes"
|
|
||||||
],
|
|
||||||
[
|
|
||||||
to_string(activity.window_started_at),
|
|
||||||
to_string(activity.window_ended_at),
|
|
||||||
to_string(activity.destination),
|
|
||||||
to_string(activity.connectivity_type),
|
|
||||||
to_string(activity.rx_bytes),
|
|
||||||
to_string(activity.tx_bytes),
|
|
||||||
to_string(activity.blocked_tx_bytes)
|
|
||||||
]
|
|
||||||
]
|
|
||||||
end
|
|
||||||
|
|
||||||
test "renders activities table", %{
|
|
||||||
account: account,
|
|
||||||
flow: flow,
|
|
||||||
identity: identity,
|
|
||||||
conn: conn
|
|
||||||
} do
|
|
||||||
activity =
|
|
||||||
Fixtures.Flows.create_activity(
|
|
||||||
account: account,
|
|
||||||
flow: flow,
|
|
||||||
window_started_at: DateTime.truncate(flow.inserted_at, :second),
|
|
||||||
window_ended_at: DateTime.truncate(flow.expires_at, :second),
|
|
||||||
tx_bytes: 1024 * 1024 * 1024 * 42
|
|
||||||
)
|
|
||||||
|
|
||||||
{:ok, lv, _html} =
|
|
||||||
conn
|
|
||||||
|> authorize_conn(identity)
|
|
||||||
|> live(~p"/#{account}/flows/#{flow}")
|
|
||||||
|
|
||||||
[row] =
|
|
||||||
lv
|
|
||||||
|> element("#activities")
|
|
||||||
|> render()
|
|
||||||
|> table_to_map()
|
|
||||||
|
|
||||||
assert row["started"]
|
|
||||||
assert row["ended"]
|
|
||||||
|
|
||||||
assert row["connectivity type"] == to_string(activity.connectivity_type)
|
|
||||||
assert row["destination"] == to_string(activity.destination)
|
|
||||||
assert row["rx"] == "#{activity.rx_bytes} B"
|
|
||||||
assert row["tx"] == "42 GB"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -59,7 +59,6 @@ config :domain, Domain.ChangeLogs.ReplicationConnection,
|
|||||||
auth_providers
|
auth_providers
|
||||||
clients
|
clients
|
||||||
flow_activities
|
flow_activities
|
||||||
flows
|
|
||||||
gateway_groups
|
gateway_groups
|
||||||
gateways
|
gateways
|
||||||
policies
|
policies
|
||||||
@@ -95,7 +94,6 @@ config :domain, Domain.Events.ReplicationConnection,
|
|||||||
auth_providers
|
auth_providers
|
||||||
clients
|
clients
|
||||||
flow_activities
|
flow_activities
|
||||||
flows
|
|
||||||
gateway_groups
|
gateway_groups
|
||||||
gateways
|
gateways
|
||||||
policies
|
policies
|
||||||
|
|||||||
Reference in New Issue
Block a user