fix(portal): Add sync deletion circuit breaker (#9194)

Why:

* We have seen issues with Google Admin SDK API returning bad
information when requesting directory info, such as Groups and
Identities. The requests seem to return successful HTTP codes, but the
data is missing, which our sync system interprets as all
Groups/Identities have been deleted from the Google Workspace. In order
to prevent this from happening a deletion circuit breaker function has
been added to stop a sync job if a certain percentage of the identities
will be deleted on the current run. This should prevent the possibility
of mass deleting Groups/Identities if an Identity Provider hands back
incorrect info on any sync.

Fixes: #9188
This commit is contained in:
Brian Manifold
2025-05-21 21:31:21 -07:00
committed by GitHub
parent 600f00f82e
commit 3eacb6b9b5
9 changed files with 357 additions and 25 deletions

View File

@@ -3,6 +3,7 @@ defmodule Domain.Actors.Group.Sync do
alias Domain.Auth
alias Domain.Actors
alias Domain.Actors.Group
require Logger
def sync_provider_groups(%Auth.Provider{} = provider, attrs_list) do
attrs_by_provider_identifier =
@@ -14,6 +15,7 @@ defmodule Domain.Actors.Group.Sync do
with {:ok, groups} <- all_provider_groups(provider),
{:ok, {upsert, delete}} <- plan_groups_update(groups, provider_identifiers),
:ok <- deletion_circuit_breaker(groups, delete, provider),
{:ok, deleted} <- delete_groups(provider, delete),
{:ok, upserted} <- upsert_groups(provider, attrs_by_provider_identifier, upsert) do
group_ids_by_provider_identifier =
@@ -57,6 +59,47 @@ defmodule Domain.Actors.Group.Sync do
{:ok, {upsert, delete}}
end
# Used to make sure we don't accidentally mass delete groups
# due to an error in the Identity Provider API (e.g. Google Admin SDK API)
defp deletion_circuit_breaker([], _groups_to_delete, _provider) do
# If there are no groups then there can't be anything to delete
:ok
end
defp deletion_circuit_breaker(_groups, [], _provider) do
:ok
end
defp deletion_circuit_breaker(groups, groups_to_delete, provider) do
groups_length = length(groups)
deletion_length = length(groups_to_delete)
delete_percentage = (deletion_length / groups_length * 100) |> round()
cond do
groups_length > 40 and delete_percentage >= 25 ->
Logger.error("Refusing to mass delete groups",
groups_length: groups_length,
deletion_length: deletion_length,
provider_id: provider.id
)
{:error, "Sync deletion of groups too large"}
groups_length <= 40 and delete_percentage >= 50 ->
Logger.error("Refusing to mass delete groups",
groups_length: groups_length,
deletion_length: deletion_length,
provider_id: provider.id
)
{:error, "Sync deletion of groups too large"}
true ->
:ok
end
end
defp delete_groups(provider, provider_identifiers_to_delete) do
Group.Query.not_deleted()
|> Group.Query.by_account_id(provider.account_id)

View File

@@ -94,7 +94,17 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.APIClient do
})
)
list_all(uri, api_token, "users")
case list_all(uri, api_token, "users") do
{:ok, []} ->
Logger.warning("Google Workspace API returned 0 users")
{:ok, []}
{:ok, [_head | _tail] = list} ->
{:ok, list}
{:error, reason} ->
{:error, reason}
end
end
def list_groups(api_token) do
@@ -111,7 +121,17 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.APIClient do
})
)
list_all(uri, api_token, "groups")
case list_all(uri, api_token, "groups") do
{:ok, []} ->
Logger.warning("Google Workspace API returned 0 groups")
{:ok, []}
{:ok, [_head | _tail] = list} ->
{:ok, list}
{:error, reason} ->
{:error, reason}
end
end
# Note: this functions does not return root (`/`) org unit

View File

@@ -1,6 +1,7 @@
defmodule Domain.Auth.Identity.Sync do
alias Domain.Repo
alias Domain.Auth.{Identity, Provider}
require Logger
def sync_provider_identities(%Provider{} = provider, attrs_list) do
attrs_by_provider_identifier =
@@ -13,6 +14,7 @@ defmodule Domain.Auth.Identity.Sync do
with {:ok, identities} <- all_provider_identities(provider),
{:ok, {insert, update, delete}} <-
plan_identities_update(identities, provider_identifiers),
:ok <- deletion_circuit_breaker(identities, delete, provider),
{:ok, deleted} <- delete_identities(provider, delete),
{:ok, inserted} <-
insert_identities(provider, attrs_by_provider_identifier, insert),
@@ -72,6 +74,47 @@ defmodule Domain.Auth.Identity.Sync do
{:ok, {insert, update, delete}}
end
# Used to make sure we don't accidentally mass delete identities
# due to an error in the Identity Provider API (e.g. Google Admin SDK API)
defp deletion_circuit_breaker([], _identities_to_delete, _provider) do
# If there are no identities then there can't be anything to delete
:ok
end
defp deletion_circuit_breaker(_identities, [], _provider) do
:ok
end
defp deletion_circuit_breaker(identities, identities_to_delete, provider) do
identities_length = length(identities)
deletion_length = length(identities_to_delete)
delete_percentage = (deletion_length / identities_length * 100) |> round()
cond do
identities_length > 40 and delete_percentage >= 25 ->
Logger.error("Refusing to mass delete identities",
identities_length: identities_length,
deletion_length: deletion_length,
provider_id: provider.id
)
{:error, "Sync deletion of identities too large"}
identities_length <= 40 and delete_percentage >= 50 ->
Logger.error("Refusing to mass delete identities",
identities_length: identities_length,
deletion_length: deletion_length,
provider_id: provider.id
)
{:error, "Sync deletion of identities too large"}
true ->
:ok
end
end
defp delete_identities(provider, provider_identifiers_to_delete) do
provider_identifiers_to_delete = Enum.uniq(provider_identifiers_to_delete)

View File

@@ -845,30 +845,58 @@ defmodule Domain.ActorsTest do
provider_identifier: "OU:OU_ID1"
)
group3 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_ID2"
)
group4 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_ID3"
)
group5 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_ID4"
)
:ok = subscribe_to_membership_updates_for_actor(actor)
:ok = Domain.Policies.subscribe_to_events_for_actor(actor)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group1)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group2)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group3)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group4)
:ok = Domain.Policies.subscribe_to_events_for_actor_group(group5)
:ok = Domain.Flows.subscribe_to_flow_expiration_events(group1_flow)
attrs_list = []
attrs_list = [
%{"name" => "Group:Infrastructure", "provider_identifier" => "G:GROUP_ID2"},
%{"name" => "Group:Security", "provider_identifier" => "G:GROUP_ID3"},
%{"name" => "Group:Finance", "provider_identifier" => "G:GROUP_ID4"}
]
assert {:ok,
%{
groups: [_group1, _group2],
plan: {[], delete},
groups: [_group1, _group2, _group3, _group4, _group5],
plan: {_upsert, delete},
deleted: [deleted_group1, deleted_group2],
upserted: [],
upserted: [_upserted_group3, _upserted_group4, _upserted_group5],
group_ids_by_provider_identifier: group_ids_by_provider_identifier
}} = sync_provider_groups(provider, attrs_list)
assert Enum.all?(["G:GROUP_ID1", "OU:OU_ID1"], &(&1 in delete))
assert deleted_group1.provider_identifier in ["G:GROUP_ID1", "OU:OU_ID1"]
assert deleted_group2.provider_identifier in ["G:GROUP_ID1", "OU:OU_ID1"]
assert Repo.aggregate(Actors.Group, :count) == 2
assert Repo.aggregate(Actors.Group.Query.not_deleted(), :count) == 0
assert Repo.aggregate(Actors.Group, :count) == 5
assert Repo.aggregate(Actors.Group.Query.not_deleted(), :count) == 3
assert Enum.empty?(group_ids_by_provider_identifier)
assert Map.keys(group_ids_by_provider_identifier) |> length() == 3
actor_id = actor.id
@@ -884,6 +912,54 @@ defmodule Domain.ActorsTest do
refute_receive {:reject_access, _policy_id, ^group2_id, _resource_id}
end
test "circuit breaker prevents mass deletion of groups", %{
account: account,
provider: provider
} do
_group1 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_ID1"
)
_group2 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "OU:OU_ID1"
)
_group3 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_ID2"
)
_group4 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_ID3"
)
_group5 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_ID4"
)
attrs_list = []
assert {:error, "Sync deletion of groups too large"} ==
sync_provider_groups(provider, attrs_list)
assert Repo.aggregate(Actors.Group, :count) == 5
assert Repo.aggregate(Actors.Group.Query.not_deleted(), :count) == 5
end
test "ignores groups that are not synced from the provider", %{
account: account,
provider: provider

View File

@@ -267,6 +267,18 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
"raw_attributes" => %{}
}
group_finance = %{
"id" => "GROUP_FINANCE_ID",
"object" => "directory_group",
"idp_id" => "finance",
"directory_id" => "dir_123",
"organization_id" => "org_123",
"name" => "Finance",
"created_at" => "2021-10-28 15:21:50.640958",
"updated_at" => "2021-12-14 12:15:45.531847",
"raw_attributes" => %{}
}
users = [
%{
"id" => "workos_user_jdoe_id",
@@ -372,6 +384,13 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
provider_identifier: "G:GROUP_ALL_ID"
)
_group_finance =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_FINANCE_ID"
)
deleted_group =
Fixtures.Actors.create_group(
account: account,
@@ -410,7 +429,12 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
WorkOSDirectory.override_base_url("http://localhost:#{bypass.port}")
WorkOSDirectory.mock_list_directories_endpoint(bypass)
WorkOSDirectory.mock_list_users_endpoint(bypass, users)
WorkOSDirectory.mock_list_groups_endpoint(bypass, [group_all, group_engineering])
WorkOSDirectory.mock_list_groups_endpoint(bypass, [
group_all,
group_engineering,
group_finance
])
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok

View File

@@ -356,7 +356,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
groups = [
%{"id" => "GROUP_ALL_ID", "displayName" => "All"},
%{"id" => "GROUP_ENGINEERING_ID", "displayName" => "Engineering"}
%{"id" => "GROUP_ENGINEERING_ID", "displayName" => "Engineering"},
%{"id" => "GROUP_FINANCE_ID", "displayName" => "Finance"}
]
one_member = [
@@ -433,6 +434,13 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
provider_identifier: "G:GROUP_ALL_ID"
)
_finance_group =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_FINANCE_ID"
)
deleted_group =
Fixtures.Actors.create_group(
account: account,
@@ -494,6 +502,13 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
Jason.encode!(%{"value" => one_member})
)
MicrosoftEntraDirectory.mock_group_members_list_endpoint(
bypass,
"GROUP_FINANCE_ID",
200,
Jason.encode!(%{"value" => []})
)
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok

View File

@@ -501,6 +501,40 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
"href" => "http://localhost:#{bypass.port}/api/v1/groups/00gezqfqxwa2ohLhp5d7/apps"
}
}
},
%{
"id" => "GROUP_FINANCE_ID",
"created" => "2024-02-07T05:30:49.000Z",
"lastUpdated" => "2024-02-07T05:30:49.000Z",
"lastMembershipUpdated" => "2024-02-07T05:32:23.000Z",
"objectClass" => [
"okta:user_group"
],
"type" => "OKTA_GROUP",
"profile" => %{
"name" => "Finance",
"description" => "Finance Dept"
},
"_links" => %{
"logo" => [
%{
"name" => "medium",
"href" => "http://localhost/md/image.png",
"type" => "image/png"
},
%{
"name" => "large",
"href" => "http://localhost/lg/image.png",
"type" => "image/png"
}
],
"users" => %{
"href" => "http://localhost:#{bypass.port}/api/v1/groups/00gezqfqxwa2ohLmp5d7/users"
},
"apps" => %{
"href" => "http://localhost:#{bypass.port}/api/v1/groups/00gezqfqxwa2ohLmp5d7/apps"
}
}
}
]
@@ -642,6 +676,13 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
provider_identifier: "G:GROUP_ENGINEERING_ID"
)
_group2 =
Fixtures.Actors.create_group(
account: account,
provider: provider,
provider_identifier: "G:GROUP_FINANCE_ID"
)
deleted_group =
Fixtures.Actors.create_group(
account: account,
@@ -694,6 +735,13 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
Jason.encode!(one_member)
)
OktaDirectory.mock_group_members_list_endpoint(
bypass,
"GROUP_FINANCE_ID",
200,
Jason.encode!([])
)
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok

View File

@@ -318,7 +318,8 @@ defmodule Domain.Auth.Adapters.OpenIDConnectTest do
Fixtures.Auth.create_identity(
account: account,
provider: provider,
provider_identifier: sub
provider_identifier: sub,
email: Fixtures.Auth.email()
)
identity =

View File

@@ -1839,7 +1839,7 @@ defmodule Domain.AuthTest do
end
test "deletes removed identities", %{account: account, provider: provider} do
provider_identifiers = ["USER_ID1", "USER_ID2"]
provider_identifiers = ["USER_ID1", "USER_ID2", "USER_ID3", "USER_ID4", "USER_ID5"]
deleted_identity_actor = Fixtures.Actors.create_actor(account: account)
@@ -1872,33 +1872,67 @@ defmodule Domain.AuthTest do
token_id: deleted_identity_token.id
)
Fixtures.Auth.create_identity(
account: account,
provider: provider,
provider_identifier: Enum.at(provider_identifiers, 1)
)
for n <- 1..4 do
Fixtures.Auth.create_identity(
account: account,
provider: provider,
provider_identifier: Enum.at(provider_identifiers, n)
)
end
:ok = Phoenix.PubSub.subscribe(Domain.PubSub, "sessions:#{deleted_identity_token.id}")
:ok = Domain.Flows.subscribe_to_flow_expiration_events(deleted_identity_flow)
attrs_list = []
attrs_list = [
%{
"actor" => %{
"name" => "Joe Smith",
"type" => "account_user"
},
"provider_identifier" => "USER_ID3"
},
%{
"actor" => %{
"name" => "Jennie Smith",
"type" => "account_user"
},
"provider_identifier" => "USER_ID4"
},
%{
"actor" => %{
"name" => "Jane Doe",
"type" => "account_admin_user"
},
"provider_identifier" => "USER_ID5"
}
]
assert {:ok,
%{
identities: [_identity1, _identity2],
plan: {[], [], delete},
identities: [_id1, _id2, _id3, _id4, _id5],
plan: {[], ["USER_ID5", "USER_ID4", "USER_ID3"], delete},
deleted: [deleted_identity1, deleted_identity2],
inserted: [],
actor_ids_by_provider_identifier: actor_ids_by_provider_identifier
}} = sync_provider_identities(provider, attrs_list)
assert Enum.all?(provider_identifiers, &(&1 in delete))
assert Enum.take(provider_identifiers, 2)
|> Enum.all?(&(&1 in delete))
assert deleted_identity1.provider_identifier in delete
assert deleted_identity2.provider_identifier in delete
assert Repo.aggregate(Auth.Identity, :count) == 9
assert Repo.aggregate(Auth.Identity.Query.not_deleted(), :count) == 7
assert Enum.empty?(actor_ids_by_provider_identifier)
assert Auth.Identity.Query.all()
|> Auth.Identity.Query.by_provider_id(provider.id)
|> Repo.aggregate(:count) == 5
assert Auth.Identity.Query.not_deleted()
|> Auth.Identity.Query.by_provider_id(provider.id)
|> Repo.aggregate(:count) == 3
assert actor_ids_by_provider_identifier
|> Map.keys()
|> length() == 3
# Signs out users which identity has been deleted
topic = "sessions:#{deleted_identity_token.id}"
@@ -1909,6 +1943,34 @@ defmodule Domain.AuthTest do
assert_receive {:expire_flow, ^flow_id, _client_id, _resource_id}
end
test "circuit breaker prevents mass deletions of identities", %{
account: account,
provider: provider
} do
provider_identifiers = ["USER_ID1", "USER_ID2", "USER_ID3", "USER_ID4", "USER_ID5"]
for n <- 0..4 do
Fixtures.Auth.create_identity(
account: account,
provider: provider,
provider_identifier: Enum.at(provider_identifiers, n)
)
end
attrs_list = []
assert {:error, "Sync deletion of identities too large"} =
sync_provider_identities(provider, attrs_list)
assert Auth.Identity.Query.all()
|> Auth.Identity.Query.by_provider_id(provider.id)
|> Repo.aggregate(:count) == 5
assert Auth.Identity.Query.not_deleted()
|> Auth.Identity.Query.by_provider_id(provider.id)
|> Repo.aggregate(:count) == 5
end
test "ignores identities that are not synced from the provider", %{
account: account,
provider: provider