diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index 66087a6d7..212817a91 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -138,8 +138,8 @@ defmodule API.Client.Channel do # We subscribe for policy access events for the actor and the groups the client is a member of, actor_group_ids = Actors.all_actor_group_ids!(socket.assigns.subject.actor) - :ok = Enum.each(actor_group_ids, &Policies.subscribe_to_events_for_actor_group/1) - :ok = Policies.subscribe_to_events_for_actor(socket.assigns.subject.actor) + :ok = Enum.each(actor_group_ids, &Events.Hooks.ActorGroups.subscribe_to_policies/1) + :ok = Events.Hooks.Actors.subscribe_to_policies(socket.assigns.subject.actor.id) {:ok, socket} = init(socket) @@ -299,12 +299,12 @@ defmodule API.Client.Channel do # Those events are broadcasted by Actors whenever a membership is created or deleted def handle_info({:create_membership, _actor_id, group_id}, socket) do - :ok = Policies.subscribe_to_events_for_actor_group(group_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(group_id) {:noreply, socket} end def handle_info({:delete_membership, _actor_id, group_id}, socket) do - :ok = Policies.unsubscribe_from_events_for_actor_group(group_id) + :ok = Events.Hooks.ActorGroups.unsubscribe_from_policies(group_id) {:noreply, socket} end diff --git a/elixir/apps/api/lib/api/controllers/policy_controller.ex b/elixir/apps/api/lib/api/controllers/policy_controller.ex index 4acf772ee..516157a7e 100644 --- a/elixir/apps/api/lib/api/controllers/policy_controller.ex +++ b/elixir/apps/api/lib/api/controllers/policy_controller.ex @@ -93,8 +93,8 @@ defmodule API.PolicyController do with {:ok, policy} <- Policies.fetch_policy_by_id_or_persistent_id(id, subject) do case Policies.update_policy(policy, params, subject) do - {:updated, updated_policy} -> - render(conn, :show, policy: updated_policy) + {:ok, policy} -> + render(conn, :show, policy: policy) {:error, reason} -> {:error, reason} diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index 03597e827..3e342a7ea 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -708,7 +708,21 @@ defmodule API.Client.ChannelTest do subject: subject } do assert_push "init", %{} - {:ok, _resource} = Domain.Policies.disable_policy(dns_resource_policy, subject) + {:ok, policy} = Domain.Policies.disable_policy(dns_resource_policy, subject) + + # Simulate disable + old_data = %{ + "id" => policy.id, + "account_id" => policy.account_id, + "resource_id" => policy.resource_id, + "actor_group_id" => policy.actor_group_id, + "conditions" => [], + "disabled_at" => nil + } + + data = Map.put(old_data, "disabled_at", "2024-01-01T00:00:00Z") + Events.Hooks.Policies.on_update(old_data, data) + assert_push "resource_deleted", _payload refute_push "resource_created_or_updated", _payload end @@ -935,6 +949,19 @@ defmodule API.Client.ChannelTest do Fixtures.Policies.disable_policy(policy) + # Simulate disable + old_data = %{ + "id" => policy.id, + "account_id" => policy.account_id, + "resource_id" => policy.resource_id, + "actor_group_id" => policy.actor_group_id, + "conditions" => [], + "disabled_at" => nil + } + + data = Map.put(old_data, "disabled_at", "2024-01-01T00:00:00Z") + Events.Hooks.Policies.on_update(old_data, data) + assert_push "resource_deleted", resource_id assert resource_id == resource.id diff --git a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex index f49b5fa8d..0b7214f7d 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex @@ -2,9 +2,7 @@ defmodule Domain.Events.Hooks.Accounts do alias Domain.PubSub require Logger - def on_insert(_data) do - :ok - end + def on_insert(_data), do: :ok # Account disabled - disconnect clients def on_update( @@ -43,6 +41,12 @@ defmodule Domain.Events.Hooks.Accounts do |> PubSub.subscribe() end + def subscribe_to_policies(account_id) do + account_id + |> policies_topic() + |> PubSub.subscribe() + end + def subscribe_to_clients_presence(account_id) do account_id |> clients_presence_topic() @@ -73,10 +77,20 @@ defmodule Domain.Events.Hooks.Accounts do |> PubSub.broadcast(payload) end + def broadcast_to_policies(account_id, payload) do + account_id + |> policies_topic() + |> PubSub.broadcast(payload) + end + defp resources_topic(account_id) do "account_resources:#{account_id}" end + defp policies_topic(account_id) do + "account_policies:#{account_id}" + end + defp topic(account_id) do "accounts:#{account_id}" end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex index fd576e53b..eb323060e 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex @@ -1,14 +1,21 @@ defmodule Domain.Events.Hooks.ActorGroupMemberships do - alias Domain.{Events, Policies, PubSub} + alias Domain.{Events, Flows, Policies, PubSub, Repo} def on_insert(%{"actor_id" => actor_id, "group_id" => group_id} = _data) do + broadcast_access(:allow, actor_id, group_id) broadcast(:create, actor_id, group_id) end def on_update(_old_data, _data), do: :ok def on_delete(%{"actor_id" => actor_id, "group_id" => group_id} = _old_data) do - broadcast(:delete, actor_id, group_id) + Task.start(fn -> + {:ok, _flows} = Flows.expire_flows_for(actor_id, group_id) + broadcast_access(:reject, actor_id, group_id) + broadcast(:delete, actor_id, group_id) + end) + + :ok end def broadcast(action, actor_id, group_id) do @@ -16,9 +23,17 @@ defmodule Domain.Events.Hooks.ActorGroupMemberships do topic = Events.Hooks.Actors.memberships_topic(actor_id) :ok = PubSub.broadcast(topic, payload) + end + defp broadcast_access(action, actor_id, group_id) do # TODO: WAL - # This is an n+1 query; refactor with a cached lookup table on the client channel - :ok = Policies.broadcast_access_events_for(action, actor_id, group_id) + # This N+1 query will go away when we broadcast flows directly + Policies.Policy.Query.not_deleted() + |> Policies.Policy.Query.by_actor_group_id(group_id) + |> Repo.all() + |> Enum.each(fn policy -> + payload = {:"#{action}_access", policy.id, policy.actor_group_id, policy.resource_id} + :ok = Events.Hooks.Actors.broadcast_to_policies(actor_id, payload) + end) end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex b/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex index c35e9a38b..b9938b142 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex @@ -1,4 +1,6 @@ defmodule Domain.Events.Hooks.ActorGroups do + alias Domain.PubSub + def on_insert(_data) do :ok end @@ -10,4 +12,26 @@ defmodule Domain.Events.Hooks.ActorGroups do def on_delete(_old_data) do :ok end + + def subscribe_to_policies(actor_group_id) do + actor_group_id + |> policies_topic() + |> PubSub.subscribe() + end + + def unsubscribe_from_policies(actor_group_id) do + actor_group_id + |> policies_topic() + |> PubSub.unsubscribe() + end + + def broadcast_to_policies(actor_group_id, payload) do + actor_group_id + |> policies_topic() + |> PubSub.broadcast(payload) + end + + defp policies_topic(actor_group_id) do + "actor_group_policies:#{actor_group_id}" + end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actors.ex b/elixir/apps/domain/lib/domain/events/hooks/actors.ex index ce03328ca..280b764c9 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actors.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actors.ex @@ -33,6 +33,22 @@ defmodule Domain.Events.Hooks.Actors do "actor_memberships:#{actor_id}" end + def subscribe_to_policies(actor_id) do + actor_id + |> policies_topic() + |> PubSub.subscribe() + end + + def broadcast_to_policies(actor_id, payload) do + actor_id + |> policies_topic() + |> PubSub.broadcast(payload) + end + + defp policies_topic(actor_id) do + "actor_policies:#{actor_id}" + end + defp clients_topic(actor_id) do "actor_clients:#{actor_id}" end diff --git a/elixir/apps/domain/lib/domain/events/hooks/policies.ex b/elixir/apps/domain/lib/domain/events/hooks/policies.ex index 60a38e813..1ccea7c23 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/policies.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/policies.ex @@ -1,13 +1,173 @@ defmodule Domain.Events.Hooks.Policies do - def on_insert(_data) do + alias Domain.{Events, Flows, PubSub} + require Logger + + def on_insert( + %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id + } = + _data + ) do + # TODO: WAL + # Creating a policy should broadcast directly to subscribed clients/gateways + payload = {:create_policy, policy_id} + access_payload = {:allow_access, policy_id, actor_group_id, resource_id} + :ok = broadcast(policy_id, payload) + :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) + :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + end + + # Enable + def on_update( + %{"disabled_at" => disabled_at} = _old_data, + %{ + "disabled_at" => nil, + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id + } = _data + ) + when not is_nil(disabled_at) do + # TODO: WAL + # Enabling a policy should broadcast directly to subscribed clients/gateways + payload = {:enable_policy, policy_id} + access_payload = {:allow_access, policy_id, actor_group_id, resource_id} + :ok = broadcast(policy_id, payload) + :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) + :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + end + + # Disable + def on_update( + %{"disabled_at" => nil} = _old_data, + %{ + "disabled_at" => disabled_at, + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id + } = _data + ) + when not is_nil(disabled_at) do + Task.start(fn -> + Flows.expire_flows_for_policy_id(policy_id) + + # TODO: WAL + # Disabling a policy should broadcast directly to the subscribed clients/gateways + payload = {:disable_policy, policy_id} + access_payload = {:reject_access, policy_id, actor_group_id, resource_id} + :ok = broadcast(policy_id, payload) + :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) + :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + end) + :ok end - def on_update(_old_data, _data) do + # Soft-delete + def on_update( + %{ + "deleted_at" => nil + } = old_data, + %{"deleted_at" => deleted_at} = _data + ) + when not is_nil(deleted_at) do + on_delete(old_data) + end + + # Breaking update - delete then create + def on_update( + %{ + "id" => old_policy_id, + "account_id" => old_account_id, + "actor_group_id" => old_actor_group_id, + "resource_id" => old_resource_id, + "conditions" => old_conditions + } = _old_data, + %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id, + "conditions" => conditions + } = data + ) + when old_actor_group_id != actor_group_id or old_resource_id != resource_id or + old_conditions != conditions do + # Only act upon this if the policy is not deleted or disabled + if is_nil(data["deleted_at"]) and is_nil(data["disabled_at"]) do + Task.start(fn -> + Flows.expire_flows_for_policy_id(policy_id) + + # TODO: WAL + # Deleting a policy should broadcast directly to the subscribed clients/gateways + payload = {:delete_policy, old_policy_id} + access_payload = {:reject_access, old_policy_id, old_actor_group_id, old_resource_id} + :ok = broadcast(old_policy_id, payload) + :ok = Events.Hooks.Accounts.broadcast_to_policies(old_account_id, payload) + :ok = Events.Hooks.ActorGroups.broadcast_to_policies(old_actor_group_id, access_payload) + + payload = {:create_policy, policy_id} + access_payload = {:allow_access, policy_id, actor_group_id, resource_id} + :ok = broadcast(policy_id, payload) + :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) + :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + end) + else + Logger.warning("Breaking update ignored for policy as it is deleted or disabled", + policy_id: policy_id + ) + end + :ok end - def on_delete(_old_data) do + # Regular update - name, description, etc + def on_update(_old_data, %{"id" => policy_id, "account_id" => account_id} = _data) do + payload = {:update_policy, policy_id} + :ok = broadcast(policy_id, payload) + :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) + end + + def on_delete( + %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id + } = _old_data + ) do + Task.start(fn -> + Flows.expire_flows_for_policy_id(policy_id) + # TODO: WAL + # Deleting a policy should broadcast directly to the subscribed clients/gateways + payload = {:delete_policy, policy_id} + access_payload = {:reject_access, policy_id, actor_group_id, resource_id} + :ok = broadcast(policy_id, payload) + :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) + :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + end) + :ok end + + def subscribe(policy_id) do + policy_id + |> topic() + |> PubSub.subscribe() + end + + defp broadcast(policy_id, payload) do + policy_id + |> topic() + |> PubSub.broadcast(payload) + end + + defp topic(policy_id) do + "policy:#{policy_id}" + end end diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index 94a575845..8396f39e2 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -197,12 +197,6 @@ defmodule Domain.Flows do |> expire_flows(subject) end - def expire_flows_for(%Policies.Policy{} = policy, %Auth.Subject{} = subject) do - Flow.Query.all() - |> Flow.Query.by_policy_id(policy.id) - |> expire_flows(subject) - end - def expire_flows_for(%Resources.Resource{} = resource, %Auth.Subject{} = subject) do Flow.Query.all() |> Flow.Query.by_resource_id(resource.id) @@ -234,6 +228,12 @@ defmodule Domain.Flows do |> expire_flows() end + def expire_flows_for_policy_id(policy_id) do + Flow.Query.all() + |> Flow.Query.by_policy_id(policy_id) + |> expire_flows() + end + defp expire_flows(queryable, subject) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.create_flows_permission()) do queryable diff --git a/elixir/apps/domain/lib/domain/policies.ex b/elixir/apps/domain/lib/domain/policies.ex index 852cfd26d..ab62b9d73 100644 --- a/elixir/apps/domain/lib/domain/policies.ex +++ b/elixir/apps/domain/lib/domain/policies.ex @@ -1,6 +1,6 @@ defmodule Domain.Policies do - alias Domain.{Repo, PubSub} - alias Domain.{Auth, Accounts, Actors, Clients, Resources, Flows} + alias Domain.Repo + alias Domain.{Auth, Actors, Clients, Resources} alias Domain.Policies.{Authorizer, Policy, Condition} def fetch_policy_by_id(id, %Auth.Subject{} = subject, opts \\ []) do @@ -68,21 +68,11 @@ defmodule Domain.Policies do with :ok <- Auth.ensure_has_permissions(subject, required_permissions) do Policy.Changeset.create(attrs, subject) |> Repo.insert() - # TODO: WAL - |> case do - {:ok, policy} -> - :ok = broadcast_policy_events(:create, policy) - {:ok, policy} - - {:error, reason} -> - {:error, reason} - end end end def change_policy(%Policy{} = policy, attrs) do - {update_changeset, _breaking_update} = Policy.Changeset.update(policy, attrs) - update_changeset + Policy.Changeset.update(policy, attrs) end def update_policy(%Policy{} = policy, attrs, %Auth.Subject{} = subject) do @@ -91,15 +81,8 @@ defmodule Domain.Policies do Policy.Query.not_deleted() |> Policy.Query.by_id(policy.id) |> Authorizer.for_subject(subject) - |> Repo.fetch_and_update_breakable(Policy.Query, - with: &Policy.Changeset.update(&1, attrs), - # TODO: WAL - after_update_commit: &broadcast_policy_events(:update, &1), - after_breaking_update_commit: fn updated_policy, _changeset -> - {:ok, _flows} = Flows.expire_flows_for(policy, subject) - :ok = broadcast_policy_events(:delete, policy) - :ok = broadcast_policy_events(:create, updated_policy) - end + |> Repo.fetch_and_update(Policy.Query, + with: &Policy.Changeset.update(&1, attrs) ) end end @@ -110,18 +93,8 @@ defmodule Domain.Policies do |> Policy.Query.by_id(policy.id) |> Authorizer.for_subject(subject) |> Repo.fetch_and_update(Policy.Query, - with: &Policy.Changeset.disable(&1, subject), - # TODO: WAL - after_commit: &broadcast_policy_events(:disable, &1) + with: &Policy.Changeset.disable(&1, subject) ) - |> case do - {:ok, policy} -> - {:ok, _flows} = Flows.expire_flows_for(policy, subject) - {:ok, policy} - - {:error, reason} -> - {:error, reason} - end end end @@ -131,9 +104,7 @@ defmodule Domain.Policies do |> Policy.Query.by_id(policy.id) |> Authorizer.for_subject(subject) |> Repo.fetch_and_update(Policy.Query, - with: &Policy.Changeset.enable/1, - # TODO: WAL - after_commit: &broadcast_policy_events(:enable, &1) + with: &Policy.Changeset.enable/1 ) end end @@ -141,7 +112,7 @@ defmodule Domain.Policies do def delete_policy(%Policy{} = policy, %Auth.Subject{} = subject) do Policy.Query.not_deleted() |> Policy.Query.by_id(policy.id) - |> delete_policies(policy, subject) + |> delete_policies(subject) |> case do {:ok, [policy]} -> {:ok, policy} {:ok, []} -> {:error, :not_found} @@ -152,33 +123,29 @@ defmodule Domain.Policies do def delete_policies_for(%Resources.Resource{} = resource, %Auth.Subject{} = subject) do Policy.Query.not_deleted() |> Policy.Query.by_resource_id(resource.id) - |> delete_policies(resource, subject) + |> delete_policies(subject) end def delete_policies_for(%Actors.Group{} = actor_group, %Auth.Subject{} = subject) do Policy.Query.not_deleted() |> Policy.Query.by_actor_group_id(actor_group.id) - |> delete_policies(actor_group, subject) + |> delete_policies(subject) end def delete_policies_for(%Auth.Provider{} = provider, %Auth.Subject{} = subject) do Policy.Query.not_deleted() |> Policy.Query.by_actor_group_provider_id(provider.id) - |> delete_policies(provider, subject) + |> delete_policies(subject) end def delete_policies_for(%Actors.Group{} = actor_group) do - {:ok, _flows} = Flows.expire_flows_for(actor_group) - Policy.Query.not_deleted() |> Policy.Query.by_actor_group_id(actor_group.id) |> delete_policies() end - defp delete_policies(queryable, assoc, subject) do + defp delete_policies(queryable, subject) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_policies_permission()) do - {:ok, _flows} = Flows.expire_flows_for(assoc, subject) - queryable |> Authorizer.for_subject(subject) |> delete_policies() @@ -191,12 +158,6 @@ defmodule Domain.Policies do |> Policy.Query.delete() |> Repo.update_all([]) - # TODO: WAL - :ok = - Enum.each(policies, fn policy -> - :ok = broadcast_policy_events(:delete, policy) - end) - {:ok, policies} end @@ -239,118 +200,4 @@ defmodule Domain.Policies do {:error, :unauthorized} end end - - ### PubSub - - defp policy_topic(%Policy{} = policy), do: policy_topic(policy.id) - defp policy_topic(policy_id), do: "policy:#{policy_id}" - - defp account_topic(%Accounts.Account{} = account), do: account_topic(account.id) - defp account_topic(account_id), do: "account_policies:#{account_id}" - - defp actor_group_topic(%Actors.Group{} = actor_group), do: actor_group_topic(actor_group.id) - defp actor_group_topic(actor_group_id), do: "actor_group_policies:#{actor_group_id}" - - defp actor_topic(%Actors.Actor{} = actor), do: actor_topic(actor.id) - defp actor_topic(actor_id), do: "actor_policies:#{actor_id}" - - def subscribe_to_events_for_policy(policy_or_id) do - policy_or_id |> policy_topic() |> PubSub.subscribe() - end - - def unsubscribe_from_events_for_policy(policy_or_id) do - policy_or_id |> policy_topic() |> PubSub.unsubscribe() - end - - def subscribe_to_events_for_account(account_or_id) do - account_or_id |> account_topic() |> PubSub.subscribe() - end - - def unsubscribe_from_events_for_account(account_or_id) do - account_or_id |> account_topic() |> PubSub.unsubscribe() - end - - def subscribe_to_events_for_actor(actor_or_id) do - actor_or_id |> actor_topic() |> PubSub.subscribe() - end - - def unsubscribe_from_events_for_actor(actor_or_id) do - actor_or_id |> actor_topic() |> PubSub.unsubscribe() - end - - def subscribe_to_events_for_actor_group(actor_group_or_id) do - actor_group_or_id |> actor_group_topic() |> PubSub.subscribe() - end - - def unsubscribe_from_events_for_actor_group(actor_group_or_id) do - actor_group_or_id |> actor_group_topic() |> PubSub.unsubscribe() - end - - # TODO: WAL - defp broadcast_policy_events(action, %Policy{} = policy) do - payload = {:"#{action}_policy", policy.id} - :ok = broadcast_to_policy(policy, payload) - :ok = broadcast_to_account(policy.account_id, payload) - :ok = broadcast_to_actor_group(policy.actor_group_id, access_event(action, policy)) - :ok - end - - def broadcast_access_events_for(action, actor_id, group_id) do - {:ok, _flows} = maybe_expire_flows(action, actor_id, group_id) - - Policy.Query.not_deleted() - |> Policy.Query.by_actor_group_id(group_id) - |> Repo.all() - |> Enum.each(fn policy -> - :ok = broadcast_to_actor(actor_id, access_event(action, policy)) - end) - end - - defp access_event(action, %Policy{} = policy) when action in [:create, :enable] do - {:allow_access, policy.id, policy.actor_group_id, policy.resource_id} - end - - defp access_event(action, %Policy{} = policy) when action in [:delete, :disable] do - {:reject_access, policy.id, policy.actor_group_id, policy.resource_id} - end - - defp access_event(:update, %Policy{}) do - nil - end - - defp maybe_expire_flows(action, actor_id, group_id) when action in [:delete, :disable] do - Flows.expire_flows_for(actor_id, group_id) - end - - defp maybe_expire_flows(_action, _actor_id, _group_id) do - {:ok, []} - end - - defp broadcast_to_policy(policy_or_id, payload) do - policy_or_id - |> policy_topic() - |> PubSub.broadcast(payload) - end - - defp broadcast_to_account(account_or_id, payload) do - account_or_id - |> account_topic() - |> PubSub.broadcast(payload) - end - - defp broadcast_to_actor(actor_or_id, payload) do - actor_or_id - |> actor_topic() - |> PubSub.broadcast(payload) - end - - defp broadcast_to_actor_group(_actor_group_or_id, nil) do - :ok - end - - defp broadcast_to_actor_group(actor_group_or_id, payload) do - actor_group_or_id - |> actor_group_topic() - |> PubSub.broadcast(payload) - end end diff --git a/elixir/apps/domain/lib/domain/policies/policy/changeset.ex b/elixir/apps/domain/lib/domain/policies/policy/changeset.ex index f859498b7..af21366da 100644 --- a/elixir/apps/domain/lib/domain/policies/policy/changeset.ex +++ b/elixir/apps/domain/lib/domain/policies/policy/changeset.ex @@ -5,7 +5,6 @@ defmodule Domain.Policies.Policy.Changeset do @fields ~w[description actor_group_id resource_id]a @update_fields ~w[description actor_group_id resource_id]a - @replace_fields ~w[actor_group_id resource_id conditions]a @required_fields ~w[actor_group_id resource_id]a def create(attrs, %Auth.Subject{} = subject) do @@ -25,18 +24,6 @@ defmodule Domain.Policies.Policy.Changeset do |> validate_required(@required_fields) |> cast_embed(:conditions, with: &Domain.Policies.Condition.Changeset.changeset/3) |> changeset() - |> maybe_breaking_update() - end - - defp maybe_breaking_update(%{valid?: false} = changeset), - do: {changeset, false} - - defp maybe_breaking_update(changeset) do - if any_field_changed?(changeset, @replace_fields) do - {changeset, true} - else - {changeset, false} - end end def disable(%Policy{} = policy, %Auth.Subject{}) do diff --git a/elixir/apps/domain/lib/domain/repo.ex b/elixir/apps/domain/lib/domain/repo.ex index a36ec6b7d..0306e3dbc 100644 --- a/elixir/apps/domain/lib/domain/repo.ex +++ b/elixir/apps/domain/lib/domain/repo.ex @@ -156,93 +156,6 @@ defmodule Domain.Repo do end end - @typedoc """ - A callback which is executed after the transaction is committed that - has a breaking change to the record in the database. - - The callback is either a function that takes the schema as an argument or - a function that takes the schema and the changeset as arguments. - - It must return `:ok`. - """ - @type break_after_commit :: (updated_schema :: term(), update_changeset :: Ecto.Changeset.t() -> - :ok) - - @typedoc """ - A callback which takes a schema and returns a changeset that is then used to update the schema and a boolean indicating whether the update is a breaking change. - """ - @type fetch_update_changeset_fun :: (term() -> - {update_changeset :: Ecto.Changeset.t(), - breaking_change :: true | false}) - - @doc """ - Uses query to fetch a single result from the database, locks it for update and - then updates it using a changesets within a database transaction. Different callbacks can - be used for a breaking change to the record. - """ - @spec fetch_and_update_breakable( - queryable :: Ecto.Queryable.t(), - query_module :: module(), - opts :: - [ - {:with, fetch_update_changeset_fun()} - | {:preload, term()} - | {:filter, Domain.Repo.Filter.filters()} - | {:after_update_commit, update_after_commit() | [update_after_commit()]} - | {:after_breaking_update_commit, break_after_commit() | [break_after_commit()]} - ] - | Keyword.t() - ) :: - {:updated, Ecto.Schema.t()} - | {:breaking_update, Ecto.Schema.t(), Ecto.Schema.t()} - | {:error, :not_found} - | {:error, {:unknown_filter, metadata :: Keyword.t()}} - | {:error, {:invalid_type, metadata :: Keyword.t()}} - | {:error, {:invalid_value, metadata :: Keyword.t()}} - | {:error, Ecto.Changeset.t()} - | {:error, term()} - def fetch_and_update_breakable(queryable, query_module, opts) do - {preload, opts} = Keyword.pop(opts, :preload, []) - {filter, opts} = Keyword.pop(opts, :filter, []) - {after_update_commit, opts} = Keyword.pop(opts, :after_update_commit, []) - {after_breaking_update_commit, opts} = Keyword.pop(opts, :after_breaking_update_commit, []) - {changeset_fun, transaction_opts} = Keyword.pop!(opts, :with) - - with {:ok, queryable} <- Filter.filter(queryable, query_module, filter) do - Ecto.Multi.new() - |> Ecto.Multi.one(:fetch_and_lock, fn - _effects_so_far -> - Ecto.Query.lock(queryable, "FOR NO KEY UPDATE") - end) - |> Ecto.Multi.run(:changeset, fn _repo, %{fetch_and_lock: schema} -> - {%Ecto.Changeset{} = update_changeset, breaking} = - changeset_fun.(schema) - - {:ok, {update_changeset, breaking}} - end) - |> Ecto.Multi.update(:update, fn - %{changeset: {update_changeset, _breaking}} -> - update_changeset - end) - |> transaction(transaction_opts) - |> case do - {:ok, %{update: updated, changeset: {update_changeset, false}}} -> - :ok = execute_after_commit(updated, update_changeset, after_update_commit) - {:updated, execute_preloads(updated, query_module, preload)} - - {:ok, %{update: updated, changeset: {update_changeset, true}}} -> - :ok = execute_after_commit(updated, update_changeset, after_breaking_update_commit) - {:updated, execute_preloads(updated, query_module, preload)} - - {:error, :fetch_and_lock, reason, _changes_so_far} -> - {:error, reason} - - {:error, :update, changeset, _changes_so_far} -> - {:error, changeset} - end - end - end - defp execute_after_commit(schema_or_tuple, changeset_or_changesets, after_commit) do after_commit |> List.wrap() diff --git a/elixir/apps/domain/priv/repo/seeds.exs b/elixir/apps/domain/priv/repo/seeds.exs index be071def7..b83c2f323 100644 --- a/elixir/apps/domain/priv/repo/seeds.exs +++ b/elixir/apps/domain/priv/repo/seeds.exs @@ -882,9 +882,9 @@ defmodule Domain.Repo.Seeds do Resources.create_resource( %{ type: :dns, - name: "google.com", - address: "google.com", - address_description: "https://google.com/", + name: "foobar.com", + address: "foobar.com", + address_description: "https://foobar.com/", connections: [%{gateway_group_id: gateway_group.id}], filters: [] }, @@ -947,8 +947,8 @@ defmodule Domain.Repo.Seeds do Resources.create_resource( %{ type: :dns, - name: "Google", - address: "*.google.com", + name: "Example", + address: "*.example.com", connections: [%{gateway_group_id: gateway_group.id}], filters: [] }, @@ -976,9 +976,9 @@ defmodule Domain.Repo.Seeds do Resources.create_resource( %{ type: :ip, - name: "CloudFlare DNS", - address: "1.1.1.1", - address_description: "http://1.1.1.1:3000/", + name: "Public DNS", + address: "1.2.3.4", + address_description: "http://1.2.3.4:3000/", connections: [%{gateway_group_id: gateway_group.id}], filters: [ %{ports: ["80", "433"], protocol: :tcp}, diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index a69e42290..91f672642 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -854,19 +854,9 @@ defmodule Domain.ActorsTest do provider_identifier: "G:GROUP_ID4" ) - group1_policy = Fixtures.Policies.create_policy(account: account, actor_group: group1) - actor = Fixtures.Actors.create_actor(account: account) Fixtures.Actors.create_membership(account: account, actor: actor, group: group1) - group1_flow = - Fixtures.Flows.create_flow( - account: account, - actor_group: group1, - resource_id: group1_policy.resource_id, - policy: group1_policy - ) - attrs_list = [ %{"name" => "Group:Infrastructure", "provider_identifier" => "G:GROUP_ID2"}, %{"name" => "Group:Security", "provider_identifier" => "G:GROUP_ID3"}, @@ -889,9 +879,6 @@ defmodule Domain.ActorsTest do assert Repo.aggregate(Actors.Group.Query.not_deleted(), :count) == 3 assert Map.keys(group_ids_by_provider_identifier) |> length() == 3 - - group1_flow = Repo.reload(group1_flow) - assert DateTime.compare(group1_flow.expires_at, DateTime.utc_now()) == :lt end test "circuit breaker prevents mass deletion of groups", %{ @@ -1259,6 +1246,10 @@ defmodule Domain.ActorsTest do "group_id" => group2.id }) + # TODO: WAL + # Remove this when direct broadcast is implemented + Process.sleep(100) + flow = Repo.reload(flow) assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end diff --git a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs index 9d05145f7..d118676c9 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs @@ -632,9 +632,9 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do :ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id) :ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id) :ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id) - :ok = Domain.Policies.subscribe_to_events_for_actor(actor) - :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) - :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) + :ok = Events.Hooks.Actors.subscribe_to_policies(actor.id) + :ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id) GoogleWorkspaceDirectory.override_endpoint_url("http://localhost:#{bypass.port}/") @@ -731,13 +731,25 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do group_id = deleted_group.id resource_id = deleted_policy.resource_id + # Simulate WAL events Events.Hooks.ActorGroupMemberships.on_delete(%{ "actor_id" => deleted_identity.actor_id, "group_id" => deleted_group.id }) + Events.Hooks.Policies.on_delete(%{ + "id" => policy_id, + "actor_group_id" => group_id, + "resource_id" => resource_id, + "account_id" => deleted_identity.account_id + }) + assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + # Deleted policies expire all flows authorized by them deleted_group_flow = Repo.reload(deleted_group_flow) assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt diff --git a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs index 3f7a6413c..fd1872a46 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs @@ -419,9 +419,9 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do :ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id) :ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id) :ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id) - :ok = Domain.Policies.subscribe_to_events_for_actor(actor) - :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) - :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) + :ok = Events.Hooks.Actors.subscribe_to_policies(actor.id) + :ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id) WorkOSDirectory.override_base_url("http://localhost:#{bypass.port}") WorkOSDirectory.mock_list_directories_endpoint(bypass) @@ -495,13 +495,25 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do group_id = deleted_group.id resource_id = deleted_policy.resource_id + # Simulate the WAL events Events.Hooks.ActorGroupMemberships.on_delete(%{ "actor_id" => actor.id, "group_id" => deleted_group.id }) + Events.Hooks.Policies.on_delete(%{ + "id" => policy_id, + "account_id" => deleted_policy.account_id, + "actor_group_id" => group_id, + "resource_id" => resource_id + }) + assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + # Deleted policies expire all flows authorized by them deleted_group_flow = Repo.reload(deleted_group_flow) assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt diff --git a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs index 22c43f502..bd20bcc7e 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs @@ -467,9 +467,9 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do :ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id) :ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id) :ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id) - :ok = Domain.Policies.subscribe_to_events_for_actor(actor) - :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) - :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) + :ok = Events.Hooks.Actors.subscribe_to_policies(actor.id) + :ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id) MicrosoftEntraDirectory.mock_groups_list_endpoint( bypass, @@ -562,13 +562,25 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do group_id = deleted_group.id resource_id = deleted_policy.resource_id + # Simulate WAL events Events.Hooks.ActorGroupMemberships.on_delete(%{ "actor_id" => deleted_identity.actor_id, "group_id" => deleted_group.id }) + Events.Hooks.Policies.on_delete(%{ + "id" => policy_id, + "account_id" => deleted_policy.id, + "actor_group_id" => group_id, + "resource_id" => resource_id + }) + assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + # Deleted policies expire all flows authorized by them deleted_group_flow = Repo.reload(deleted_group_flow) assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt diff --git a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs index 8dc03341d..b61dfe108 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs @@ -711,9 +711,9 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do :ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id) :ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id) :ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id) - :ok = Domain.Policies.subscribe_to_events_for_actor(actor) - :ok = Domain.Policies.subscribe_to_events_for_actor(other_actor) - :ok = Domain.Policies.subscribe_to_events_for_actor_group(deleted_group) + :ok = Events.Hooks.Actors.subscribe_to_policies(actor.id) + :ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id) OktaDirectory.mock_groups_list_endpoint(bypass, 200, Jason.encode!(groups)) OktaDirectory.mock_users_list_endpoint(bypass, 200, Jason.encode!(users)) @@ -796,13 +796,25 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do group_id = deleted_group.id resource_id = deleted_policy.resource_id + # Simulate WAL events Events.Hooks.ActorGroupMemberships.on_delete(%{ "actor_id" => actor.id, "group_id" => deleted_group.id }) + Events.Hooks.Policies.on_delete(%{ + "id" => policy_id, + "actor_group_id" => group_id, + "resource_id" => resource_id, + "account_id" => deleted_policy.account_id + }) + assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + # Deleted policies expire all flows authorized by them deleted_group_flow = Repo.reload(deleted_group_flow) assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt diff --git a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs index 6e6db01b6..bb506c2a9 100644 --- a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs @@ -1,26 +1,269 @@ defmodule Domain.Events.Hooks.PoliciesTest do - use ExUnit.Case, async: true + use Domain.DataCase, async: true import Domain.Events.Hooks.Policies - - setup do - %{old_data: %{}, data: %{}} - end + alias Domain.Events describe "insert/1" do - test "returns :ok", %{data: data} do + test "broadcasts :create_policy and :allow_access" do + policy_id = "policy-123" + account_id = "account-456" + actor_group_id = "group-456" + resource_id = "resource-789" + + data = %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id + } + + :ok = subscribe(policy_id) + :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + assert :ok == on_insert(data) + assert_receive {:create_policy, ^policy_id} + assert_receive {:create_policy, ^policy_id} + assert_receive {:allow_access, ^policy_id, ^actor_group_id, ^resource_id} end end describe "update/2" do - test "returns :ok", %{old_data: old_data, data: data} do + test "enable: broadcasts :enable_policy and :allow_access" do + policy_id = "policy-123" + account_id = "account-456" + actor_group_id = "group-456" + resource_id = "resource-789" + + old_data = %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id, + "disabled_at" => "2023-10-01T00:00:00Z" + } + + data = Map.put(old_data, "disabled_at", nil) + + :ok = subscribe(policy_id) + :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + assert :ok == on_update(old_data, data) + assert_receive {:enable_policy, ^policy_id} + assert_receive {:enable_policy, ^policy_id} + assert_receive {:allow_access, ^policy_id, ^actor_group_id, ^resource_id} + end + + test "disable: broadcasts :disable_policy and :reject_access" do + flow = Fixtures.Flows.create_flow() + policy_id = flow.policy_id + account_id = flow.account_id + actor_group_id = "group-456" + resource_id = flow.resource_id + + old_data = %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id, + "disabled_at" => nil + } + + data = Map.put(old_data, "disabled_at", "2023-10-01T00:00:00Z") + + :ok = subscribe(policy_id) + :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + + assert :ok == on_update(old_data, data) + assert_receive {:disable_policy, ^policy_id} + assert_receive {:disable_policy, ^policy_id} + assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id} + + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + + flow = Repo.reload(flow) + + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + end + + test "soft-delete: broadcasts :delete_policy and :reject_access" do + flow = Fixtures.Flows.create_flow() + policy_id = flow.policy_id + account_id = flow.account_id + actor_group_id = "group-456" + resource_id = flow.resource_id + + old_data = %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id, + "deleted_at" => nil + } + + data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z") + + :ok = subscribe(policy_id) + :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + + assert :ok == on_update(old_data, data) + assert_receive {:delete_policy, ^policy_id} + assert_receive {:delete_policy, ^policy_id} + assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id} + + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + + flow = Repo.reload(flow) + + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + end + + test "breaking update: broadcasts :delete_policy, :reject_access, :create_policy, :allow_access" do + flow = Fixtures.Flows.create_flow() + policy_id = flow.policy_id + account_id = flow.account_id + actor_group_id = "group-456" + resource_id = flow.resource_id + + old_data = %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id, + "conditions" => [] + } + + data = Map.put(old_data, "resource_id", "new-resource-123") + + :ok = subscribe(policy_id) + :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + + assert :ok == on_update(old_data, data) + + assert_receive {:delete_policy, ^policy_id} + assert_receive {:delete_policy, ^policy_id} + assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id} + + assert_receive {:create_policy, ^policy_id} + assert_receive {:create_policy, ^policy_id} + assert_receive {:allow_access, ^policy_id, ^actor_group_id, "new-resource-123"} + + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + + flow = Repo.reload(flow) + + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + end + + test "breaking update: disabled policy has no side-effects" do + flow = Fixtures.Flows.create_flow() + policy_id = flow.policy_id + account_id = flow.account_id + actor_group_id = "group-456" + resource_id = flow.resource_id + + old_data = %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id, + "disabled_at" => "2023-10-01T00:00:00Z" + } + + data = Map.put(old_data, "resource_id", "new-resource-123") + + :ok = subscribe(policy_id) + :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + + assert :ok == on_update(old_data, data) + + refute_receive {:delete_policy, ^policy_id} + refute_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id} + refute_receive {:create_policy, ^policy_id} + refute_receive {:allow_access, ^policy_id, ^actor_group_id, "new-resource-123"} + + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + + flow = Repo.reload(flow) + + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :gt + end + + test "non-breaking-update: broadcasts :update_policy" do + policy_id = "policy-123" + account_id = "account-456" + actor_group_id = "group-456" + resource_id = "resource-789" + + old_data = %{ + "description" => "Old Policy", + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id, + "disabled_at" => "2023-10-01T00:00:00Z" + } + + data = Map.put(old_data, "resource_id", "new-resource-123") + + :ok = subscribe(policy_id) + :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + + assert :ok == on_update(old_data, data) + + assert_receive {:update_policy, ^policy_id} + assert_receive {:update_policy, ^policy_id} end end describe "delete/1" do - test "returns :ok", %{data: data} do - assert :ok == on_delete(data) + test "broadcasts :delete_policy and :reject_access" do + flow = Fixtures.Flows.create_flow() + policy_id = flow.policy_id + account_id = flow.account_id + actor_group_id = "group-456" + resource_id = flow.resource_id + + old_data = %{ + "id" => policy_id, + "account_id" => account_id, + "actor_group_id" => actor_group_id, + "resource_id" => resource_id, + "deleted_at" => nil + } + + data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z") + + :ok = subscribe(policy_id) + :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) + :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + + assert :ok == on_update(old_data, data) + assert_receive {:delete_policy, ^policy_id} + assert_receive {:delete_policy, ^policy_id} + assert_receive {:reject_access, ^policy_id, ^actor_group_id, ^resource_id} + + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + + flow = Repo.reload(flow) + + assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt end end end diff --git a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs index cb5bdce05..80c36284a 100644 --- a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs @@ -74,6 +74,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert :ok == on_update(old_data, data) + # TODO: WAL + # Remove this after direct broadcast Process.sleep(100) flow = Repo.reload(flow) @@ -95,6 +97,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert :ok == on_update(old_data, data) + # TODO: WAL + # Remove this after direct broadcast Process.sleep(100) flow = Repo.reload(flow) @@ -116,6 +120,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert :ok == on_update(old_data, data) + # TODO: WAL + # Remove this after direct broadcast Process.sleep(100) flow = Repo.reload(flow) @@ -137,6 +143,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert :ok == on_update(old_data, data) + # TODO: WAL + # Remove this after direct broadcast Process.sleep(100) flow = Repo.reload(flow) diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index d24f8c3bf..c4927b2e6 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -999,10 +999,9 @@ defmodule Domain.FlowsTest do test "expires flows for policy", %{ flow: flow, - policy: policy, - subject: subject + policy: policy } do - assert {:ok, [expired_flow]} = expire_flows_for(policy, subject) + assert {:ok, [expired_flow]} = expire_flows_for_policy_id(policy.id) assert DateTime.diff(expired_flow.expires_at, DateTime.utc_now()) <= 1 assert expired_flow.id == flow.id end diff --git a/elixir/apps/domain/test/domain/policies_test.exs b/elixir/apps/domain/test/domain/policies_test.exs index 930736a2e..7a55c7d75 100644 --- a/elixir/apps/domain/test/domain/policies_test.exs +++ b/elixir/apps/domain/test/domain/policies_test.exs @@ -390,48 +390,6 @@ defmodule Domain.PoliciesTest do } ] end - - test "broadcasts an account message when policy is created", %{ - account: account, - subject: subject - } do - resource = Fixtures.Resources.create_resource(account: account) - actor_group = Fixtures.Actors.create_group(account: account) - - attrs = %{ - actor_group_id: actor_group.id, - resource_id: resource.id - } - - :ok = subscribe_to_events_for_account(account) - - assert {:ok, policy} = create_policy(attrs, subject) - - assert_receive {:create_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts an actor group message when policy is created", %{ - account: account, - subject: subject - } do - resource = Fixtures.Resources.create_resource(account: account) - actor_group = Fixtures.Actors.create_group(account: account) - - attrs = %{ - actor_group_id: actor_group.id, - resource_id: resource.id - } - - :ok = subscribe_to_events_for_actor_group(actor_group) - - assert {:ok, policy} = create_policy(attrs, subject) - - assert_receive {:allow_access, policy_id, actor_group_id, resource_id} - assert policy_id == policy.id - assert actor_group_id == actor_group.id - assert resource_id == resource.id - end end describe "update_policy/3" do @@ -453,7 +411,7 @@ defmodule Domain.PoliciesTest do end test "does nothing on empty params", %{policy: policy, subject: subject} do - assert {:updated, _policy} = update_policy(policy, %{}, subject) + assert {:ok, _policy} = update_policy(policy, %{}, subject) end test "returns changeset error on invalid params", %{account: account, subject: subject} do @@ -466,51 +424,10 @@ defmodule Domain.PoliciesTest do test "allows update to description", %{policy: policy, subject: subject} do attrs = %{description: "updated policy description"} - assert {:updated, updated_policy} = update_policy(policy, attrs, subject) + assert {:ok, updated_policy} = update_policy(policy, attrs, subject) assert updated_policy.description == attrs.description end - test "broadcasts an account message when policy is updated", %{ - account: account, - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_account(account) - - attrs = %{description: "updated policy description"} - assert {:updated, policy} = update_policy(policy, attrs, subject) - - assert_receive {:update_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts a policy message when policy is updated", %{ - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_policy(policy) - - attrs = %{description: "updated policy description"} - assert {:updated, updated_policy} = update_policy(policy, attrs, subject) - assert updated_policy.id == policy.id - - assert_receive {:update_policy, policy_id} - assert policy_id == policy.id - end - - test "does not broadcast an actor group message when policy is updated", %{ - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_actor_group(policy.actor_group_id) - - attrs = %{description: "updated policy description"} - assert {:updated, _policy} = update_policy(policy, attrs, subject) - - refute_receive {:allow_access, _policy_id, _actor_group_id, _resource_id} - refute_receive {:reject_access, _policy_id, _actor_group_id, _resource_id} - end - test "updates a policy when resource_id is changed", %{ policy: policy, account: account, @@ -520,7 +437,7 @@ defmodule Domain.PoliciesTest do attrs = %{resource_id: new_resource.id} - assert {:updated, updated_policy} = update_policy(policy, attrs, subject) + assert {:ok, updated_policy} = update_policy(policy, attrs, subject) assert updated_policy.resource_id != policy.resource_id assert updated_policy.resource_id == attrs[:resource_id] @@ -537,7 +454,7 @@ defmodule Domain.PoliciesTest do attrs = %{actor_group_id: new_actor_group.id} - assert {:updated, updated_policy} = + assert {:ok, updated_policy} = update_policy(policy, attrs, subject) assert updated_policy.id == policy.id @@ -561,7 +478,7 @@ defmodule Domain.PoliciesTest do ] } - assert {:updated, updated_policy} = + assert {:ok, updated_policy} = update_policy(policy, attrs, subject) assert updated_policy.id == policy.id @@ -585,36 +502,17 @@ defmodule Domain.PoliciesTest do ] end - test "broadcasts events and expires flow for updated policy", %{ + test "allows breaking updates", %{ policy: policy, account: account, subject: subject } do - flow = Fixtures.Flows.create_flow(account: account, subject: subject, policy: policy) new_resource = Fixtures.Resources.create_resource(account: account) new_actor_group = Fixtures.Actors.create_group(account: account) - :ok = subscribe_to_events_for_policy(policy) - :ok = subscribe_to_events_for_actor_group(policy.actor_group_id) - attrs = %{resource_id: new_resource.id, actor_group_id: new_actor_group.id} - assert {:updated, updated_policy} = update_policy(policy, attrs, subject) - - # Updating a policy sends delete and create events - assert_receive {:delete_policy, policy_id} - assert policy_id == policy.id - - assert_receive {:create_policy, policy_id} - assert policy_id == updated_policy.id - - assert_receive {:reject_access, policy_id, actor_group_id, resource_id} - assert policy_id == policy.id - assert actor_group_id == policy.actor_group_id - assert resource_id == policy.resource_id - - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt + assert {:ok, _updated_policy} = update_policy(policy, attrs, subject) end test "returns error when subject has no permission to update policies", %{ @@ -677,70 +575,6 @@ defmodule Domain.PoliciesTest do assert is_nil(other_policy.disabled_at) end - test "expires policy flows", %{ - account: account, - policy: policy, - identity: identity, - subject: subject - } do - client = Fixtures.Clients.create_client(account: account, identity: identity) - - flow = - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client, - policy: policy - ) - - assert {:ok, _policy} = disable_policy(policy, subject) - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 - - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt - end - - test "broadcasts an account message when policy is disabled", %{ - account: account, - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_account(account) - - assert {:ok, policy} = disable_policy(policy, subject) - - assert_receive {:disable_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts a policy message when policy is disabled", %{ - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_policy(policy) - - assert {:ok, policy} = disable_policy(policy, subject) - - assert_receive {:disable_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts an actor group message when policy is disabled", %{ - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_actor_group(policy.actor_group_id) - - assert {:ok, policy} = disable_policy(policy, subject) - - assert_receive {:reject_access, policy_id, actor_group_id, resource_id} - assert policy_id == policy.id - assert actor_group_id == policy.actor_group_id - assert resource_id == policy.resource_id - end - test "does not do anything when an policy is disabled twice", %{ subject: subject, account: account @@ -796,45 +630,6 @@ defmodule Domain.PoliciesTest do assert is_nil(policy.disabled_at) end - test "broadcasts an account message when policy is enabled", %{ - account: account, - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_account(account) - - assert {:ok, policy} = enable_policy(policy, subject) - - assert_receive {:enable_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts a policy message when policy is enabled", %{ - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_policy(policy) - - assert {:ok, policy} = enable_policy(policy, subject) - - assert_receive {:enable_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts an actor group message when policy is enabled", %{ - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_actor_group(policy.actor_group_id) - - assert {:ok, policy} = enable_policy(policy, subject) - - assert_receive {:allow_access, policy_id, actor_group_id, resource_id} - assert policy_id == policy.id - assert actor_group_id == policy.actor_group_id - assert resource_id == policy.resource_id - end - test "does not do anything when an policy is enabled twice", %{ subject: subject, policy: policy @@ -881,70 +676,6 @@ defmodule Domain.PoliciesTest do assert deleted_policy.deleted_at != nil end - test "expires policy flows", %{ - account: account, - policy: policy, - identity: identity, - subject: subject - } do - client = Fixtures.Clients.create_client(account: account, identity: identity) - - flow = - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client, - policy: policy - ) - - assert {:ok, _policy} = delete_policy(policy, subject) - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 - - flow = Repo.reload(flow) - assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt - end - - test "broadcasts an account message when policy is deleted", %{ - account: account, - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_account(account) - - assert {:ok, policy} = delete_policy(policy, subject) - - assert_receive {:delete_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts a policy message when policy is deleted", %{ - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_policy(policy) - - assert {:ok, policy} = delete_policy(policy, subject) - - assert_receive {:delete_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts an actor group message when policy is deleted", %{ - subject: subject, - policy: policy - } do - :ok = subscribe_to_events_for_actor_group(policy.actor_group_id) - - assert {:ok, policy} = delete_policy(policy, subject) - - assert_receive {:reject_access, policy_id, actor_group_id, resource_id} - assert policy_id == policy.id - assert actor_group_id == policy.actor_group_id - assert resource_id == policy.resource_id - end - test "returns error when subject has no permission to delete policies", %{ policy: policy, subject: subject @@ -1081,70 +812,6 @@ defmodule Domain.PoliciesTest do assert is_nil(Repo.get(Policies.Policy, other_policy.id).deleted_at) end - test "expires policy flows", %{ - account: account, - identity: identity, - policy: policy, - resource: resource, - subject: subject - } do - client = Fixtures.Clients.create_client(account: account, identity: identity) - - Fixtures.Flows.create_flow( - account: account, - subject: subject, - client: client, - policy: policy, - resource: resource - ) - - assert {:ok, [_deleted_policy]} = delete_policies_for(resource, subject) - - expires_at = Repo.one(Domain.Flows.Flow).expires_at - assert DateTime.diff(expires_at, DateTime.utc_now()) <= 1 - end - - test "broadcasts an account message when policy is deleted", %{ - account: account, - resource: resource, - subject: subject - } do - :ok = subscribe_to_events_for_account(account) - - assert {:ok, [policy]} = delete_policies_for(resource, subject) - - assert_receive {:delete_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts a policy message when policy is deleted", %{ - resource: resource, - policy: policy, - subject: subject - } do - :ok = subscribe_to_events_for_policy(policy) - - assert {:ok, [policy]} = delete_policies_for(resource, subject) - - assert_receive {:delete_policy, policy_id} - assert policy_id == policy.id - end - - test "broadcasts an actor group message when policy is deleted", %{ - resource: resource, - actor_group: actor_group, - subject: subject - } do - :ok = subscribe_to_events_for_actor_group(actor_group) - - assert {:ok, [policy]} = delete_policies_for(resource, subject) - - assert_receive {:reject_access, policy_id, actor_group_id, resource_id} - assert policy_id == policy.id - assert actor_group_id == policy.actor_group_id - assert resource_id == policy.resource_id - end - test "returns error when subject has no permission to delete policies", %{ resource: resource, subject: subject diff --git a/elixir/apps/domain/test/domain/resources_test.exs b/elixir/apps/domain/test/domain/resources_test.exs index b94184e3f..864552b3f 100644 --- a/elixir/apps/domain/test/domain/resources_test.exs +++ b/elixir/apps/domain/test/domain/resources_test.exs @@ -1571,6 +1571,8 @@ defmodule Domain.ResourcesTest do # TODO: WAL # Remove this when directly broadcasting flow removals Events.Hooks.ResourceConnections.on_delete(%{"resource_id" => resource.id}) + # TODO: WAL + # Remove this after direct broadcast Process.sleep(100) flow = Repo.reload(flow) assert DateTime.compare(flow.expires_at, DateTime.utc_now()) == :lt diff --git a/elixir/apps/web/lib/web/live/policies/edit.ex b/elixir/apps/web/lib/web/live/policies/edit.ex index 666da8d55..3bd26b960 100644 --- a/elixir/apps/web/lib/web/live/policies/edit.ex +++ b/elixir/apps/web/lib/web/live/policies/edit.ex @@ -219,9 +219,8 @@ defmodule Web.Policies.Edit do |> maybe_drop_unsupported_conditions(socket) case Policies.update_policy(socket.assigns.policy, params, socket.assigns.subject) do - {:updated, updated_policy} -> - {:noreply, - push_navigate(socket, to: ~p"/#{socket.assigns.account}/policies/#{updated_policy}")} + {:ok, policy} -> + {:noreply, push_navigate(socket, to: ~p"/#{socket.assigns.account}/policies/#{policy}")} {:error, changeset} -> {:noreply, assign(socket, form: to_form(changeset))} diff --git a/elixir/apps/web/lib/web/live/policies/index.ex b/elixir/apps/web/lib/web/live/policies/index.ex index fa323dd7c..1ea3a2909 100644 --- a/elixir/apps/web/lib/web/live/policies/index.ex +++ b/elixir/apps/web/lib/web/live/policies/index.ex @@ -1,10 +1,10 @@ defmodule Web.Policies.Index do use Web, :live_view - alias Domain.Policies + alias Domain.{Events, Policies} def mount(_params, _session, socket) do if connected?(socket) do - :ok = Policies.subscribe_to_events_for_account(socket.assigns.account) + :ok = Events.Hooks.Accounts.subscribe_to_policies(socket.assigns.account.id) end socket = diff --git a/elixir/apps/web/lib/web/live/policies/show.ex b/elixir/apps/web/lib/web/live/policies/show.ex index 88511114e..4a05c16bd 100644 --- a/elixir/apps/web/lib/web/live/policies/show.ex +++ b/elixir/apps/web/lib/web/live/policies/show.ex @@ -1,7 +1,7 @@ defmodule Web.Policies.Show do use Web, :live_view import Web.Policies.Components - alias Domain.{Accounts, Policies, Flows, Auth} + alias Domain.{Accounts, Policies, Events, Flows, Auth} def mount(%{"id" => id}, _session, socket) do with {:ok, policy} <- @@ -16,7 +16,7 @@ defmodule Web.Policies.Show do providers = Auth.all_active_providers_for_account!(socket.assigns.account) if connected?(socket) do - :ok = Policies.subscribe_to_events_for_policy(policy) + :ok = Events.Hooks.Policies.subscribe(policy.id) end socket = diff --git a/elixir/apps/web/test/web/live/policies/index_test.exs b/elixir/apps/web/test/web/live/policies/index_test.exs index 78398eb69..465e16935 100644 --- a/elixir/apps/web/test/web/live/policies/index_test.exs +++ b/elixir/apps/web/test/web/live/policies/index_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Policies.IndexTest do use Web.ConnCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -92,7 +93,14 @@ defmodule Web.Live.Policies.IndexTest do refute html =~ "The table data has changed." refute html =~ "reload-btn" - Fixtures.Policies.create_policy(account: account, description: "foo bar") + policy = Fixtures.Policies.create_policy(account: account, description: "foo bar") + + Events.Hooks.Policies.on_insert(%{ + "id" => policy.id, + "actor_group_id" => policy.actor_group_id, + "resource_id" => policy.resource_id, + "account_id" => account.id + }) reload_btn = lv @@ -120,6 +128,17 @@ defmodule Web.Live.Policies.IndexTest do Domain.Policies.delete_policy(policy, subject) + Events.Hooks.Policies.on_delete(%{ + "id" => policy.id, + "actor_group_id" => policy.actor_group_id, + "resource_id" => policy.resource_id, + "account_id" => account.id + }) + + # TODO: WAL + # Remove this after direct broadcast + Process.sleep(100) + reload_btn = lv |> element("#policies-reload-btn")