diff --git a/elixir/apps/domain/lib/domain/actors.ex b/elixir/apps/domain/lib/domain/actors.ex index 4bf8d878d..37e08e20d 100644 --- a/elixir/apps/domain/lib/domain/actors.ex +++ b/elixir/apps/domain/lib/domain/actors.ex @@ -121,12 +121,22 @@ defmodule Domain.Actors do end end - def sync_provider_groups_multi(%Auth.Provider{} = provider, attrs_list) do - Group.Sync.sync_provider_groups_multi(provider, attrs_list) + def sync_provider_groups(%Auth.Provider{} = provider, attrs_list) do + Group.Sync.sync_provider_groups(provider, attrs_list) end - def sync_provider_memberships_multi(multi, %Auth.Provider{} = provider, tuples) do - Membership.Sync.sync_provider_memberships_multi(multi, provider, tuples) + def sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + %Auth.Provider{} = provider, + tuples + ) do + Membership.Sync.sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + provider, + tuples + ) end def new_group(attrs \\ %{}) do diff --git a/elixir/apps/domain/lib/domain/actors/group/sync.ex b/elixir/apps/domain/lib/domain/actors/group/sync.ex index 69a009506..8b0ba7de9 100644 --- a/elixir/apps/domain/lib/domain/actors/group/sync.ex +++ b/elixir/apps/domain/lib/domain/actors/group/sync.ex @@ -1,9 +1,10 @@ defmodule Domain.Actors.Group.Sync do + alias Domain.Repo alias Domain.Auth alias Domain.Actors alias Domain.Actors.Group - def sync_provider_groups_multi(%Auth.Provider{} = provider, attrs_list) do + def sync_provider_groups(%Auth.Provider{} = provider, attrs_list) do attrs_by_provider_identifier = for attrs <- attrs_list, into: %{} do {Map.fetch!(attrs, "provider_identifier"), attrs} @@ -11,46 +12,36 @@ defmodule Domain.Actors.Group.Sync do provider_identifiers = Map.keys(attrs_by_provider_identifier) - Ecto.Multi.new() - |> Ecto.Multi.all(:groups, fn _effects_so_far -> - fetch_provider_groups_query(provider) - end) - |> Ecto.Multi.run(:plan_groups, fn _repo, %{groups: groups} -> - plan_groups_update(groups, provider_identifiers) - end) - |> Ecto.Multi.run( - :delete_groups, - fn repo, %{plan_groups: {_upsert, delete}} -> - delete_groups(repo, provider, delete) - end - ) - |> Ecto.Multi.run(:upsert_groups, fn repo, %{plan_groups: {upsert, _delete}} -> - upsert_groups(repo, provider, attrs_by_provider_identifier, upsert) - end) - |> Ecto.Multi.run( - :group_ids_by_provider_identifier, - fn _repo, - %{ - plan_groups: {_upsert, delete}, - groups: groups, - upsert_groups: upsert_groups - } -> - group_ids_by_provider_identifier = - for group <- groups ++ upsert_groups, - group.provider_identifier not in delete, - into: %{} do - {group.provider_identifier, group.id} - end + with {:ok, groups} <- all_provider_groups(provider), + {:ok, {upsert, delete}} <- plan_groups_update(groups, provider_identifiers), + {:ok, deleted} <- delete_groups(provider, delete), + {:ok, upserted} <- upsert_groups(provider, attrs_by_provider_identifier, upsert) do + group_ids_by_provider_identifier = + for group <- groups ++ upserted, + group.provider_identifier not in delete, + into: %{} do + {group.provider_identifier, group.id} + end - {:ok, group_ids_by_provider_identifier} - end - ) + {:ok, + %{ + groups: groups, + plan: {upsert, delete}, + deleted: deleted, + upserted: upserted, + group_ids_by_provider_identifier: group_ids_by_provider_identifier + }} + end end - defp fetch_provider_groups_query(provider) do - Group.Query.not_deleted() - |> Group.Query.by_account_id(provider.account_id) - |> Group.Query.by_provider_id(provider.id) + defp all_provider_groups(provider) do + groups = + Group.Query.not_deleted() + |> Group.Query.by_account_id(provider.account_id) + |> Group.Query.by_provider_id(provider.id) + |> Repo.all() + + {:ok, groups} end defp plan_groups_update(groups, provider_identifiers) do @@ -66,7 +57,7 @@ defmodule Domain.Actors.Group.Sync do {:ok, {upsert, delete}} end - defp delete_groups(_repo, provider, provider_identifiers_to_delete) do + defp delete_groups(provider, provider_identifiers_to_delete) do Group.Query.not_deleted() |> Group.Query.by_account_id(provider.account_id) |> Group.Query.by_provider_id(provider.id) @@ -74,13 +65,13 @@ defmodule Domain.Actors.Group.Sync do |> Actors.delete_groups() end - defp upsert_groups(repo, provider, attrs_by_provider_identifier, provider_identifiers_to_upsert) do + defp upsert_groups(provider, attrs_by_provider_identifier, provider_identifiers_to_upsert) do provider_identifiers_to_upsert |> Enum.reduce_while({:ok, []}, fn provider_identifier, {:ok, acc} -> attrs = Map.get(attrs_by_provider_identifier, provider_identifier) attrs = Map.put(attrs, "type", :managed) - case upsert_group(repo, provider, attrs) do + case upsert_group(Repo, provider, attrs) do {:ok, group} -> {:cont, {:ok, [group | acc]}} diff --git a/elixir/apps/domain/lib/domain/actors/membership/sync.ex b/elixir/apps/domain/lib/domain/actors/membership/sync.ex index 835af7d86..7e162acc7 100644 --- a/elixir/apps/domain/lib/domain/actors/membership/sync.ex +++ b/elixir/apps/domain/lib/domain/actors/membership/sync.ex @@ -1,61 +1,57 @@ defmodule Domain.Actors.Membership.Sync do + alias Domain.Repo alias Domain.Auth alias Domain.Actors alias Domain.Actors.Membership - def sync_provider_memberships_multi(multi, %Auth.Provider{} = provider, tuples) do - multi - |> Ecto.Multi.all(:memberships, fn _effects_so_far -> - fetch_provider_memberships_query(provider) - end) - |> Ecto.Multi.run( - :plan_memberships, - fn _repo, - %{ - actor_ids_by_provider_identifier: actor_ids_by_provider_identifier, - group_ids_by_provider_identifier: group_ids_by_provider_identifier, - memberships: memberships - } -> - tuples = - Enum.flat_map(tuples, fn {group_provider_identifier, actor_provider_identifier} -> - group_id = Map.get(group_ids_by_provider_identifier, group_provider_identifier) - actor_id = Map.get(actor_ids_by_provider_identifier, actor_provider_identifier) + def sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + %Auth.Provider{} = provider, + tuples + ) do + tuples = + Enum.flat_map(tuples, fn {group_provider_identifier, actor_provider_identifier} -> + group_id = Map.get(group_ids_by_provider_identifier, group_provider_identifier) + actor_id = Map.get(actor_ids_by_provider_identifier, actor_provider_identifier) - if is_nil(group_id) or is_nil(actor_id) do - [] - else - [{group_id, actor_id}] - end - end) + if is_nil(group_id) or is_nil(actor_id) do + [] + else + [{group_id, actor_id}] + end + end) - plan_memberships_update(tuples, memberships) - end - ) - |> Ecto.Multi.delete_all(:delete_memberships, fn %{plan_memberships: {_insert, delete}} -> - delete_memberships_query(delete) - end) - |> Ecto.Multi.run(:insert_memberships, fn repo, %{plan_memberships: {insert, _delete}} -> - insert_memberships(repo, provider, insert) - end) - |> Ecto.Multi.run(:broadcast_membership_updates, fn - _repo, %{plan_memberships: {insert, delete}} -> - :ok = - Enum.each(insert, fn {group_id, actor_id} -> - Actors.broadcast_membership_event(:create, actor_id, group_id) - end) + with {:ok, memberships} <- all_provider_memberships(provider), + {:ok, {insert, delete}} <- plan_memberships_update(tuples, memberships), + deleted_stats = delete_memberships(delete), + {:ok, inserted} <- insert_memberships(provider, insert) do + :ok = + Enum.each(insert, fn {group_id, actor_id} -> + Actors.broadcast_membership_event(:create, actor_id, group_id) + end) - :ok = - Enum.each(delete, fn {group_id, actor_id} -> - Actors.broadcast_membership_event(:delete, actor_id, group_id) - end) + :ok = + Enum.each(delete, fn {group_id, actor_id} -> + Actors.broadcast_membership_event(:delete, actor_id, group_id) + end) - {:ok, :ok} - end) + {:ok, + %{ + plan: {insert, delete}, + inserted: inserted, + deleted_stats: deleted_stats + }} + end end - defp fetch_provider_memberships_query(provider) do - Membership.Query.by_account_id(provider.account_id) - |> Membership.Query.by_group_provider_id(provider.id) + defp all_provider_memberships(provider) do + memberships = + Membership.Query.by_account_id(provider.account_id) + |> Membership.Query.by_group_provider_id(provider.id) + |> Repo.all() + + {:ok, memberships} end defp plan_memberships_update(tuples, memberships) do @@ -77,16 +73,17 @@ defmodule Domain.Actors.Membership.Sync do {:ok, {insert, delete}} end - defp delete_memberships_query(provider_identifiers_to_delete) do + defp delete_memberships(provider_identifiers_to_delete) do Membership.Query.by_group_id_and_actor_id({:in, provider_identifiers_to_delete}) + |> Repo.delete_all() end - defp insert_memberships(repo, provider, provider_identifiers_to_insert) do + defp insert_memberships(provider, provider_identifiers_to_insert) do provider_identifiers_to_insert |> Enum.reduce_while({:ok, []}, fn {group_id, actor_id}, {:ok, acc} -> attrs = %{group_id: group_id, actor_id: actor_id} - case upsert_membership(repo, provider, attrs) do + case upsert_membership(provider, attrs) do {:ok, membership} -> {:cont, {:ok, [membership | acc]}} @@ -96,9 +93,9 @@ defmodule Domain.Actors.Membership.Sync do end) end - defp upsert_membership(repo, provider, attrs) do + defp upsert_membership(provider, attrs) do Membership.Changeset.upsert(provider.account_id, %Membership{}, attrs) - |> repo.insert( + |> Repo.insert( conflict_target: Membership.Changeset.upsert_conflict_target(), on_conflict: Membership.Changeset.upsert_on_conflict(), returning: true diff --git a/elixir/apps/domain/lib/domain/auth.ex b/elixir/apps/domain/lib/domain/auth.ex index 338245037..5d4149301 100644 --- a/elixir/apps/domain/lib/domain/auth.ex +++ b/elixir/apps/domain/lib/domain/auth.ex @@ -360,8 +360,8 @@ defmodule Domain.Auth do end end - def sync_provider_identities_multi(%Provider{} = provider, attrs_list) do - Identity.Sync.sync_provider_identities_multi(provider, attrs_list) + def sync_provider_identities(%Provider{} = provider, attrs_list) do + Identity.Sync.sync_provider_identities(provider, attrs_list) end def all_actor_ids_by_membership_rules!(account_id, membership_rules) do @@ -789,6 +789,7 @@ defmodule Domain.Auth do {:error, :not_found} end + # Maybe we need a NOWAIT here to prevent timeouts when background jobs are updating the identity defp maybe_fetch_subject_identity(subject, token) do Identity.Query.not_disabled() |> Identity.Query.by_id(token.identity_id) diff --git a/elixir/apps/domain/lib/domain/auth/adapter/directory_sync.ex b/elixir/apps/domain/lib/domain/auth/adapter/directory_sync.ex index f3a942146..a0afa1b99 100644 --- a/elixir/apps/domain/lib/domain/auth/adapter/directory_sync.ex +++ b/elixir/apps/domain/lib/domain/auth/adapter/directory_sync.ex @@ -1,9 +1,20 @@ defmodule Domain.Auth.Adapter.DirectorySync do alias Domain.Repo + alias Domain.Jobs.Executors.Concurrent alias Domain.{Auth, Actors} require Logger + require OpenTelemetry.Tracer - @async_data_fetch_timeout :infinity + # The Finch will timeout requests after 30 seconds, + # but there are a lot of requests that need to be made + # so we don't want to limit the timeout here + @async_data_fetch_timeout :timer.minutes(30) + + # This timeout is used to limit the time spent on a single provider + # inserting the records into the database + @database_operations_timeout :timer.minutes(30) + + @provider_sync_timeout @async_data_fetch_timeout + @database_operations_timeout @doc """ Returns a tuple with the data needed to sync all entities of the provider. @@ -51,7 +62,7 @@ defmodule Domain.Auth.Adapter.DirectorySync do Where `user_message` user message will be rendered in the UI and `log_message` will be logged with a level that corresponds to number of retries (see `log_sync_error/2`). """ - @callback gather_provider_data(%Auth.Provider{}) :: + @callback gather_provider_data(%Auth.Provider{}, task_supervisor_pid :: pid()) :: {:ok, { identities_attrs :: [ @@ -70,95 +81,206 @@ defmodule Domain.Auth.Adapter.DirectorySync do | {:error, {:unauthorized, user_message :: String.t()}} | {:error, user_message :: String.t(), log_message :: String.t()} - def sync_providers(module, providers) do - providers - |> Domain.Repo.preload(:account) - |> Enum.each(&sync_provider(module, &1)) + def sync_providers(module, adapter, supervisor_pid) do + start_time = System.monotonic_time(:millisecond) + + Domain.Repo.transaction( + fn -> + metadata = Logger.metadata() + pdict = Process.get() + + all_providers = Domain.Auth.all_providers_pending_sync_by_adapter!(adapter) + providers = Concurrent.reject_locked("auth_providers", all_providers) + providers = Domain.Repo.preload(providers, :account) + Logger.info("Syncing #{length(providers)}/#{length(all_providers)} #{adapter} providers") + + Task.Supervisor.async_stream_nolink( + supervisor_pid, + providers, + fn provider -> + OpenTelemetry.Tracer.with_span "sync_provider", + attributes: %{ + account_id: provider.account_id, + provider_id: provider.id, + provider_adapter: provider.adapter + } do + :ok = maybe_reuse_connection(pdict) + Logger.metadata(metadata) + + Logger.metadata( + account_id: provider.account_id, + provider_id: provider.id, + provider_adapter: provider.adapter + ) + + sync_provider(module, provider) + end + end, + timeout: @provider_sync_timeout, + max_concurrency: 3 + ) + |> Stream.run() + end, + # sync can take a long time so we will manage timeouts for each provider separately + timeout: :infinity + ) + + finish_time = System.monotonic_time(:millisecond) + Logger.info("Finished syncing in #{time_taken(start_time, finish_time)}") end defp sync_provider(module, provider) do - Logger.debug("Syncing provider", - account_id: provider.account_id, - provider_id: provider.id, - provider_adapter: provider.adapter - ) + start_time = System.monotonic_time(:millisecond) + Logger.debug("Syncing provider") - with true <- Domain.Accounts.idp_sync_enabled?(provider.account), - {:ok, {identities_attrs, actor_groups_attrs, membership_tuples}} <- - module.gather_provider_data(provider) do - Ecto.Multi.new() - |> Ecto.Multi.one(:lock_provider, fn _effects_so_far -> - Auth.Provider.Query.not_disabled() - |> Auth.Provider.Query.by_account_id(provider.account_id) - |> Auth.Provider.Query.by_id(provider.id) - |> Auth.Provider.Query.lock() - end) - |> Ecto.Multi.append(Auth.sync_provider_identities_multi(provider, identities_attrs)) - |> Ecto.Multi.append(Actors.sync_provider_groups_multi(provider, actor_groups_attrs)) - |> Actors.sync_provider_memberships_multi(provider, membership_tuples) - |> Ecto.Multi.update(:save_last_updated_at, fn _effects_so_far -> - Auth.Provider.Changeset.sync_finished(provider) - end) - |> Repo.transaction(timeout: :timer.minutes(30)) - |> case do - {:ok, effects} -> - log_sync_result(provider, effects) - :ok + {:ok, pid} = Task.Supervisor.start_link() - {:error, reason} -> - log_sync_error( - provider, - "Repo error: " <> inspect(reason) - ) + if Domain.Accounts.idp_sync_enabled?(provider.account) do + with {:ok, data, data_fetch_time_taken} <- fetch_provider_data(module, provider, pid), + {:ok, db_operations_time_taken} <- insert_provider_updates(provider, data) do + finish_time = System.monotonic_time(:millisecond) - {:error, step, reason, _effects_so_far} -> - log_sync_error( - provider, - "Multi error at step " <> inspect(step) <> ": " <> inspect(reason) - ) + :telemetry.execute( + [:domain, :directory_sync], + %{ + data_fetch_total_time: data_fetch_time_taken, + db_operations_total_time: db_operations_time_taken, + total_time: finish_time - start_time + }, + %{ + account_id: provider.account_id, + provider_id: provider.id, + provider_adapter: provider.adapter + } + ) + + Logger.debug("Finished syncing provider in #{time_taken(start_time, finish_time)}") + else + _other -> + finish_time = System.monotonic_time(:millisecond) + Logger.debug("Failed to sync provider in #{time_taken(start_time, finish_time)}") end else - false -> - message = "IdP sync is not enabled in your subscription plan" + message = "IdP sync is not enabled in your subscription plan" - Auth.Provider.Changeset.sync_failed(provider, message) - |> Domain.Repo.update!() - - {:error, {:unauthorized, user_message}} -> - Auth.Provider.Changeset.sync_requires_manual_intervention(provider, user_message) - |> Domain.Repo.update!() - - {:error, nil, log_message} -> - log_sync_error(provider, log_message) - - {:error, user_message, log_message} -> - Auth.Provider.Changeset.sync_failed(provider, user_message) - |> Domain.Repo.update!() - |> log_sync_error(log_message) + Auth.Provider.Changeset.sync_failed(provider, message) + |> Domain.Repo.update!() end end - defp log_sync_result(provider, effects) do - %{ - # Identities - plan_identities: {identities_insert_ids, identities_update_ids, identities_delete_ids}, - insert_identities: identities_inserted, - update_identities_and_actors: identities_updated, - delete_identities: identities_deleted, - # Groups - plan_groups: {groups_upsert_ids, groups_delete_ids}, - upsert_groups: groups_upserted, - delete_groups: groups_deleted, - # Memberships - plan_memberships: {memberships_insert_tuples, memberships_delete_tuples}, - insert_memberships: memberships_inserted, - delete_memberships: {deleted_memberships_count, _} - } = effects + defp fetch_provider_data(module, provider, task_supervisor_pid) do + OpenTelemetry.Tracer.with_span "sync_provider.fetch_data" do + start_time = System.monotonic_time(:millisecond) - Logger.debug("Finished syncing provider", - provider_id: provider.id, - provider_adapter: provider.adapter, - account_id: provider.account_id, + with {:ok, data} <- module.gather_provider_data(provider, task_supervisor_pid) do + finish_time = System.monotonic_time(:millisecond) + time_taken = time_taken(start_time, finish_time) + + Logger.debug( + "Finished fetching data for provider in #{time_taken}", + account_id: provider.account_id, + provider_id: provider.id, + provider_adapter: provider.adapter, + time_taken: time_taken + ) + + {:ok, data, finish_time - start_time} + else + {:error, {:unauthorized, user_message}} -> + OpenTelemetry.Tracer.set_status(:error, inspect(user_message)) + + Auth.Provider.Changeset.sync_requires_manual_intervention(provider, user_message) + |> Domain.Repo.update!() + + {:error, nil, log_message} -> + OpenTelemetry.Tracer.set_status(:error, inspect(log_message)) + + log_sync_error(provider, log_message) + + {:error, user_message, log_message} -> + OpenTelemetry.Tracer.set_status(:error, inspect(log_message)) + + Auth.Provider.Changeset.sync_failed(provider, user_message) + |> Domain.Repo.update!() + |> log_sync_error(log_message) + end + end + end + + defp insert_provider_updates( + provider, + {identities_attrs, actor_groups_attrs, membership_tuples} + ) do + OpenTelemetry.Tracer.with_span "sync_provider.insert_data" do + Repo.checkout( + fn -> + start_time = System.monotonic_time(:millisecond) + + with {:ok, identities_effects} <- + Auth.sync_provider_identities(provider, identities_attrs), + {:ok, groups_effects} <- Actors.sync_provider_groups(provider, actor_groups_attrs), + {:ok, memberships_effects} <- + Actors.sync_provider_memberships( + identities_effects.actor_ids_by_provider_identifier, + groups_effects.group_ids_by_provider_identifier, + provider, + membership_tuples + ) do + Auth.Provider.Changeset.sync_finished(provider) + |> Repo.update!() + + finish_time = System.monotonic_time(:millisecond) + + log_sync_result( + start_time, + finish_time, + identities_effects, + groups_effects, + memberships_effects + ) + + {:ok, finish_time - start_time} + else + {:error, reason} -> + OpenTelemetry.Tracer.set_status(:error, inspect(reason)) + log_sync_error(provider, "Repo error: " <> inspect(reason)) + end + end, + timeout: @database_operations_timeout + ) + end + end + + defp log_sync_result( + start_time, + finish_time, + identities_effects, + groups_effects, + memberships_effects + ) do + %{ + plan: {identities_insert_ids, identities_update_ids, identities_delete_ids}, + inserted: identities_inserted, + updated: identities_updated, + deleted: identities_deleted + } = identities_effects + + %{ + plan: {groups_upsert_ids, groups_delete_ids}, + upserted: groups_upserted, + deleted: groups_deleted + } = groups_effects + + %{ + plan: {memberships_insert_tuples, memberships_delete_tuples}, + inserted: memberships_inserted, + deleted_stats: {deleted_memberships_count, _} + } = memberships_effects + + time_taken = time_taken(start_time, finish_time) + + Logger.debug("Finished syncing provider in #{time_taken}", + time_taken: time_taken, # Identities plan_identities_insert: length(identities_insert_ids), plan_identities_update: length(identities_update_ids), @@ -179,6 +301,12 @@ defmodule Domain.Auth.Adapter.DirectorySync do ) end + defp time_taken(start_time, finish_time) do + ~T[00:00:00] + |> Time.add(finish_time - start_time, :millisecond) + |> to_string() + end + defp log_sync_error(provider, message) do metadata = [ account_id: provider.account_id, @@ -213,7 +341,10 @@ defmodule Domain.Auth.Adapter.DirectorySync do Process.put(:last_caller_pid, caller_pid) OpenTelemetry.Ctx.attach(opentelemetry_ctx) OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) - callback.() + + OpenTelemetry.Tracer.with_span "sync_provider.fetch_data.#{name}" do + callback.() + end end) {name, task} @@ -231,4 +362,26 @@ defmodule Domain.Auth.Adapter.DirectorySync do end end) end + + if Mix.env() == :test do + # We need this function to reuse the connection that was checked out in a parent process. + # + # `Ecto.SQL.Sandbox.allow/3` will not work in this case because it will try to checkout the same connection + # that is held by the parent process by `Repo.checkout/2` which will lead to a timeout, so we need to hack + # and reuse it manually. + def maybe_reuse_connection(pdict) do + pdict + |> Enum.filter(fn + {{Ecto.Adapters.SQL, _pid}, _} -> true + _ -> false + end) + |> Enum.each(fn {key, value} -> + Process.put(key, value) + end) + end + else + def maybe_reuse_connection(_pdict) do + :ok + end + end end diff --git a/elixir/apps/domain/lib/domain/auth/adapters/google_workspace.ex b/elixir/apps/domain/lib/domain/auth/adapters/google_workspace.ex index ace32cdf0..116811eaa 100644 --- a/elixir/apps/domain/lib/domain/auth/adapters/google_workspace.ex +++ b/elixir/apps/domain/lib/domain/auth/adapters/google_workspace.ex @@ -17,8 +17,9 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace do def init(_init_arg) do children = [ GoogleWorkspace.APIClient, - {Task.Supervisor, name: __MODULE__.TaskSupervisor}, - {Domain.Jobs, GoogleWorkspace.Jobs} + # Background Jobs + GoogleWorkspace.Jobs.RefreshAccessTokens, + GoogleWorkspace.Jobs.SyncDirectory ] Supervisor.init(children, strategy: :one_for_one) diff --git a/elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs/refresh_access_tokens.ex b/elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs/refresh_access_tokens.ex new file mode 100644 index 000000000..f4a3e54ee --- /dev/null +++ b/elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs/refresh_access_tokens.ex @@ -0,0 +1,37 @@ +defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.RefreshAccessTokens do + use Domain.Jobs.Job, + otp_app: :domain, + every: :timer.minutes(5), + executor: Domain.Jobs.Executors.GloballyUnique + + alias Domain.Auth.Adapters.GoogleWorkspace + require Logger + + @impl true + def execute(_config) do + providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:google_workspace) + Logger.debug("Refreshing access tokens for #{length(providers)} providers") + + Enum.each(providers, fn provider -> + Logger.debug("Refreshing access token", + provider_id: provider.id, + account_id: provider.account_id + ) + + case GoogleWorkspace.refresh_access_token(provider) do + {:ok, provider} -> + Logger.debug("Finished refreshing access token", + provider_id: provider.id, + account_id: provider.account_id + ) + + {:error, reason} -> + Logger.error("Failed refreshing access token", + provider_id: provider.id, + account_id: provider.account_id, + reason: inspect(reason) + ) + end + end) + end +end diff --git a/elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs.ex b/elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs/sync_directory.ex similarity index 69% rename from elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs.ex rename to elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs/sync_directory.ex index d0af08971..c104713dc 100644 --- a/elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs.ex +++ b/elixir/apps/domain/lib/domain/auth/adapters/google_workspace/jobs/sync_directory.ex @@ -1,49 +1,31 @@ -defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs do - use Domain.Jobs.Recurrent, otp_app: :domain +defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectory do + use Domain.Jobs.Job, + otp_app: :domain, + every: :timer.minutes(2), + executor: Domain.Jobs.Executors.Concurrent + alias Domain.Auth.Adapter.DirectorySync alias Domain.Auth.Adapters.GoogleWorkspace require Logger - @behaviour DirectorySync + @task_supervisor __MODULE__.TaskSupervisor - every minutes(5), :refresh_access_tokens do - providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:google_workspace) - Logger.debug("Refreshing access tokens for #{length(providers)} providers") - - Enum.each(providers, fn provider -> - Logger.debug("Refreshing access token", - provider_id: provider.id, - account_id: provider.account_id - ) - - case GoogleWorkspace.refresh_access_token(provider) do - {:ok, provider} -> - Logger.debug("Finished refreshing access token", - provider_id: provider.id, - account_id: provider.account_id - ) - - {:error, reason} -> - Logger.error("Failed refreshing access token", - provider_id: provider.id, - account_id: provider.account_id, - reason: inspect(reason) - ) - end - end) + @impl true + def state(_config) do + {:ok, pid} = Task.Supervisor.start_link(name: @task_supervisor) + {:ok, %{task_supervisor: pid}} end - every minutes(3), :sync_directory do - providers = Domain.Auth.all_providers_pending_sync_by_adapter!(:google_workspace) - Logger.debug("Syncing #{length(providers)} Google Workspace providers") - DirectorySync.sync_providers(__MODULE__, providers) + @impl true + def execute(%{task_supervisor: pid}) do + DirectorySync.sync_providers(__MODULE__, :google_workspace, pid) end - def gather_provider_data(provider) do + def gather_provider_data(provider, task_supervisor_pid) do access_token = provider.adapter_state["access_token"] async_results = - DirectorySync.run_async_requests(GoogleWorkspace.TaskSupervisor, + DirectorySync.run_async_requests(task_supervisor_pid, users: fn -> GoogleWorkspace.APIClient.list_users(access_token) end, diff --git a/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra.ex b/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra.ex index 387f67ef9..542fdb3d9 100644 --- a/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra.ex +++ b/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra.ex @@ -17,8 +17,9 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra do def init(_init_arg) do children = [ MicrosoftEntra.APIClient, - {Task.Supervisor, name: __MODULE__.TaskSupervisor}, - {Domain.Jobs, MicrosoftEntra.Jobs} + # Background Jobs + MicrosoftEntra.Jobs.RefreshAccessTokens, + MicrosoftEntra.Jobs.SyncDirectory ] Supervisor.init(children, strategy: :one_for_one) diff --git a/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs/refresh_access_tokens.ex b/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs/refresh_access_tokens.ex new file mode 100644 index 000000000..435d54f35 --- /dev/null +++ b/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs/refresh_access_tokens.ex @@ -0,0 +1,37 @@ +defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.RefreshAccessTokens do + use Domain.Jobs.Job, + otp_app: :domain, + every: :timer.minutes(5), + executor: Domain.Jobs.Executors.GloballyUnique + + alias Domain.Auth.Adapters.MicrosoftEntra + require Logger + + @impl true + def execute(_config) do + providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:microsoft_entra) + Logger.debug("Refreshing access tokens for #{length(providers)} Microsoft Entra providers") + + Enum.each(providers, fn provider -> + Logger.debug("Refreshing access token", + provider_id: provider.id, + account_id: provider.account_id + ) + + case MicrosoftEntra.refresh_access_token(provider) do + {:ok, provider} -> + Logger.debug("Finished refreshing access token", + provider_id: provider.id, + account_id: provider.account_id + ) + + {:error, reason} -> + Logger.error("Failed refreshing access token", + provider_id: provider.id, + account_id: provider.account_id, + reason: inspect(reason) + ) + end + end) + end +end diff --git a/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs.ex b/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs/sync_directory.ex similarity index 61% rename from elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs.ex rename to elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs/sync_directory.ex index 8acf9d120..c5342c338 100644 --- a/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs.ex +++ b/elixir/apps/domain/lib/domain/auth/adapters/microsoft_entra/jobs/sync_directory.ex @@ -1,49 +1,31 @@ -defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs do - use Domain.Jobs.Recurrent, otp_app: :domain +defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectory do + use Domain.Jobs.Job, + otp_app: :domain, + every: :timer.minutes(5), + executor: Domain.Jobs.Executors.Concurrent + alias Domain.Auth.Adapter.DirectorySync alias Domain.Auth.Adapters.MicrosoftEntra require Logger - @behaviour DirectorySync + @task_supervisor __MODULE__.TaskSupervisor - every minutes(5), :refresh_access_tokens do - providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:microsoft_entra) - Logger.debug("Refreshing access tokens for #{length(providers)} Microsoft Entra providers") - - Enum.each(providers, fn provider -> - Logger.debug("Refreshing access token", - provider_id: provider.id, - account_id: provider.account_id - ) - - case MicrosoftEntra.refresh_access_token(provider) do - {:ok, provider} -> - Logger.debug("Finished refreshing access token", - provider_id: provider.id, - account_id: provider.account_id - ) - - {:error, reason} -> - Logger.error("Failed refreshing access token", - provider_id: provider.id, - account_id: provider.account_id, - reason: inspect(reason) - ) - end - end) + @impl true + def state(_config) do + {:ok, pid} = Task.Supervisor.start_link(name: @task_supervisor) + {:ok, %{task_supervisor: pid}} end - every minutes(3), :sync_directory do - providers = Domain.Auth.all_providers_pending_sync_by_adapter!(:microsoft_entra) - Logger.debug("Syncing #{length(providers)} Microsoft Entra providers") - DirectorySync.sync_providers(__MODULE__, providers) + @impl true + def execute(%{task_supervisor: pid}) do + DirectorySync.sync_providers(__MODULE__, :microsoft_entra, pid) end - def gather_provider_data(provider) do + def gather_provider_data(provider, task_supervisor_pid) do access_token = provider.adapter_state["access_token"] async_results = - DirectorySync.run_async_requests(MicrosoftEntra.TaskSupervisor, + DirectorySync.run_async_requests(task_supervisor_pid, users: fn -> MicrosoftEntra.APIClient.list_users(access_token) end, diff --git a/elixir/apps/domain/lib/domain/auth/adapters/okta.ex b/elixir/apps/domain/lib/domain/auth/adapters/okta.ex index 65d64d9fd..c6e449d91 100644 --- a/elixir/apps/domain/lib/domain/auth/adapters/okta.ex +++ b/elixir/apps/domain/lib/domain/auth/adapters/okta.ex @@ -17,8 +17,9 @@ defmodule Domain.Auth.Adapters.Okta do def init(_init_arg) do children = [ Okta.APIClient, - {Task.Supervisor, name: __MODULE__.TaskSupervisor}, - {Domain.Jobs, Okta.Jobs} + # Background Jobs + Okta.Jobs.RefreshAccessTokens, + Okta.Jobs.SyncDirectory ] Supervisor.init(children, strategy: :one_for_one) diff --git a/elixir/apps/domain/lib/domain/auth/adapters/okta/jobs/refresh_access_tokens.ex b/elixir/apps/domain/lib/domain/auth/adapters/okta/jobs/refresh_access_tokens.ex new file mode 100644 index 000000000..eaaffb87d --- /dev/null +++ b/elixir/apps/domain/lib/domain/auth/adapters/okta/jobs/refresh_access_tokens.ex @@ -0,0 +1,37 @@ +defmodule Domain.Auth.Adapters.Okta.Jobs.RefreshAccessTokens do + use Domain.Jobs.Job, + otp_app: :domain, + every: :timer.minutes(5), + executor: Domain.Jobs.Executors.GloballyUnique + + alias Domain.Auth.Adapters.Okta + require Logger + + @impl true + def execute(_config) do + providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:okta) + Logger.debug("Refreshing access tokens for #{length(providers)} Okta providers") + + Enum.each(providers, fn provider -> + Logger.debug("Refreshing access token", + provider_id: provider.id, + account_id: provider.account_id + ) + + case Okta.refresh_access_token(provider) do + {:ok, provider} -> + Logger.debug("Finished refreshing access token", + provider_id: provider.id, + account_id: provider.account_id + ) + + {:error, reason} -> + Logger.error("Failed refreshing access token", + provider_id: provider.id, + account_id: provider.account_id, + reason: inspect(reason) + ) + end + end) + end +end diff --git a/elixir/apps/domain/lib/domain/auth/adapters/okta/jobs.ex b/elixir/apps/domain/lib/domain/auth/adapters/okta/jobs/sync_directory.ex similarity index 64% rename from elixir/apps/domain/lib/domain/auth/adapters/okta/jobs.ex rename to elixir/apps/domain/lib/domain/auth/adapters/okta/jobs/sync_directory.ex index 62fc21bbf..d26116985 100644 --- a/elixir/apps/domain/lib/domain/auth/adapters/okta/jobs.ex +++ b/elixir/apps/domain/lib/domain/auth/adapters/okta/jobs/sync_directory.ex @@ -1,50 +1,32 @@ -defmodule Domain.Auth.Adapters.Okta.Jobs do - use Domain.Jobs.Recurrent, otp_app: :domain +defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectory do + use Domain.Jobs.Job, + otp_app: :domain, + every: :timer.minutes(5), + executor: Domain.Jobs.Executors.Concurrent + alias Domain.Auth.Adapter.DirectorySync alias Domain.Auth.Adapters.Okta require Logger - @behaviour DirectorySync + @task_supervisor __MODULE__.TaskSupervisor - every minutes(5), :refresh_access_tokens do - providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:okta) - Logger.debug("Refreshing access tokens for #{length(providers)} Okta providers") - - Enum.each(providers, fn provider -> - Logger.debug("Refreshing access token", - provider_id: provider.id, - account_id: provider.account_id - ) - - case Okta.refresh_access_token(provider) do - {:ok, provider} -> - Logger.debug("Finished refreshing access token", - provider_id: provider.id, - account_id: provider.account_id - ) - - {:error, reason} -> - Logger.error("Failed refreshing access token", - provider_id: provider.id, - account_id: provider.account_id, - reason: inspect(reason) - ) - end - end) + @impl true + def state(_config) do + {:ok, pid} = Task.Supervisor.start_link(name: @task_supervisor) + {:ok, %{task_supervisor: pid}} end - every minutes(3), :sync_directory do - providers = Domain.Auth.all_providers_pending_sync_by_adapter!(:okta) - Logger.debug("Syncing #{length(providers)} Okta providers") - DirectorySync.sync_providers(__MODULE__, providers) + @impl true + def execute(%{task_supervisor: pid}) do + DirectorySync.sync_providers(__MODULE__, :okta, pid) end - def gather_provider_data(provider) do + def gather_provider_data(provider, task_supervisor_pid) do endpoint = provider.adapter_config["api_base_url"] access_token = provider.adapter_state["access_token"] async_results = - DirectorySync.run_async_requests(Okta.TaskSupervisor, + DirectorySync.run_async_requests(task_supervisor_pid, users: fn -> Okta.APIClient.list_users(endpoint, access_token) end, diff --git a/elixir/apps/domain/lib/domain/auth/identity/sync.ex b/elixir/apps/domain/lib/domain/auth/identity/sync.ex index d9151cd7a..5085bd2d6 100644 --- a/elixir/apps/domain/lib/domain/auth/identity/sync.ex +++ b/elixir/apps/domain/lib/domain/auth/identity/sync.ex @@ -1,7 +1,8 @@ defmodule Domain.Auth.Identity.Sync do + alias Domain.Repo alias Domain.Auth.{Identity, Provider} - def sync_provider_identities_multi(%Provider{} = provider, attrs_list) do + def sync_provider_identities(%Provider{} = provider, attrs_list) do attrs_by_provider_identifier = for attrs <- attrs_list, into: %{} do {Map.fetch!(attrs, "provider_identifier"), attrs} @@ -9,56 +10,42 @@ defmodule Domain.Auth.Identity.Sync do provider_identifiers = Map.keys(attrs_by_provider_identifier) - Ecto.Multi.new() - |> Ecto.Multi.all(:identities, fn _effects_so_far -> - fetch_provider_identities_query(provider) - end) - |> Ecto.Multi.run(:plan_identities, fn _repo, %{identities: identities} -> - plan_identities_update(identities, provider_identifiers) - end) - |> Ecto.Multi.run( - :delete_identities, - fn repo, %{plan_identities: {_insert, _update, delete}} -> - delete_identities(repo, provider, delete) - end - ) - |> Ecto.Multi.run( - :insert_identities, - fn repo, %{plan_identities: {insert, _update, _delete}} -> - insert_identities(repo, provider, attrs_by_provider_identifier, insert) - end - ) - |> Ecto.Multi.run( - :update_identities_and_actors, - fn repo, %{identities: identities, plan_identities: {_insert, update, _delete}} -> - update_identities_and_actors(repo, identities, attrs_by_provider_identifier, update) - end - ) - |> Ecto.Multi.run( - :actor_ids_by_provider_identifier, - fn _repo, - %{ - update_identities_and_actors: update_identities, - insert_identities: insert_identities - } -> - actor_ids_by_provider_identifier = - for identity <- update_identities ++ insert_identities, - into: %{} do - {identity.provider_identifier, identity.actor_id} - end - - {:ok, actor_ids_by_provider_identifier} - end - ) - |> Ecto.Multi.run(:recalculate_dynamic_groups, fn _repo, _effects_so_far -> + with {:ok, identities} <- all_provider_identities(provider), + {:ok, {insert, update, delete}} <- + plan_identities_update(identities, provider_identifiers), + {:ok, deleted} <- delete_identities(provider, delete), + {:ok, inserted} <- + insert_identities(provider, attrs_by_provider_identifier, insert), + {:ok, updated} <- + update_identities_and_actors(identities, attrs_by_provider_identifier, update) do Domain.Actors.update_dynamic_group_memberships(provider.account_id) - end) + + actor_ids_by_provider_identifier = + for identity <- updated ++ inserted, + into: %{} do + {identity.provider_identifier, identity.actor_id} + end + + {:ok, + %{ + identities: identities, + plan: {insert, update, delete}, + delete: deleted, + insert: inserted, + update: updated, + actor_ids_by_provider_identifier: actor_ids_by_provider_identifier + }} + end end - defp fetch_provider_identities_query(provider) do - Identity.Query.all() - |> Identity.Query.by_account_id(provider.account_id) - |> Identity.Query.by_provider_id(provider.id) + defp all_provider_identities(provider) do + identities = + Identity.Query.all() + |> Identity.Query.by_account_id(provider.account_id) + |> Identity.Query.by_provider_id(provider.id) + |> Repo.all() + + {:ok, identities} end defp plan_identities_update(identities, provider_identifiers) do @@ -85,7 +72,7 @@ defmodule Domain.Auth.Identity.Sync do {:ok, {insert, update, delete}} end - defp delete_identities(repo, provider, provider_identifiers_to_delete) do + defp delete_identities(provider, provider_identifiers_to_delete) do provider_identifiers_to_delete = Enum.uniq(provider_identifiers_to_delete) {_count, identities} = @@ -94,7 +81,7 @@ defmodule Domain.Auth.Identity.Sync do |> Identity.Query.by_provider_id(provider.id) |> Identity.Query.by_provider_identifier({:in, provider_identifiers_to_delete}) |> Identity.Query.delete() - |> repo.update_all([]) + |> Repo.update_all([]) :ok = Enum.each(identities, fn identity -> @@ -104,19 +91,14 @@ defmodule Domain.Auth.Identity.Sync do {:ok, identities} end - defp insert_identities( - repo, - provider, - attrs_by_provider_identifier, - provider_identifiers_to_insert - ) do + defp insert_identities(provider, attrs_by_provider_identifier, provider_identifiers_to_insert) do provider_identifiers_to_insert |> Enum.uniq() |> Enum.reduce_while({:ok, []}, fn provider_identifier, {:ok, acc} -> attrs = Map.get(attrs_by_provider_identifier, provider_identifier) changeset = Identity.Changeset.create_identity_and_actor(provider, attrs) - case repo.insert(changeset) do + case Repo.insert(changeset) do {:ok, identity} -> {:cont, {:ok, [identity | acc]}} @@ -127,7 +109,6 @@ defmodule Domain.Auth.Identity.Sync do end defp update_identities_and_actors( - repo, identities, attrs_by_provider_identifier, provider_identifiers_to_update @@ -137,7 +118,7 @@ defmodule Domain.Auth.Identity.Sync do |> Enum.filter(fn identity -> identity.provider_identifier in provider_identifiers_to_update end) - |> repo.preload(:actor) + |> Repo.preload(:actor) |> Enum.reduce(%{}, fn identity, acc -> acc_identity = Map.get(acc, identity.provider_identifier) @@ -161,7 +142,7 @@ defmodule Domain.Auth.Identity.Sync do attrs = Map.get(attrs_by_provider_identifier, provider_identifier) changeset = Identity.Changeset.update_identity_and_actor(identity, attrs) - case repo.update(changeset) do + case Repo.update(changeset) do {:ok, identity} -> {:cont, {:ok, [identity | acc]}} diff --git a/elixir/apps/domain/lib/domain/auth/provider/query.ex b/elixir/apps/domain/lib/domain/auth/provider/query.ex index 9d9382f57..6ac1566a8 100644 --- a/elixir/apps/domain/lib/domain/auth/provider/query.ex +++ b/elixir/apps/domain/lib/domain/auth/provider/query.ex @@ -56,6 +56,10 @@ defmodule Domain.Auth.Provider.Query do |> where([providers: providers], is_nil(providers.sync_disabled_at)) end + def order_by_sync_priority(queryable) do + order_by(queryable, [providers: providers], asc_nulls_first: providers.last_synced_at) + end + def by_non_empty_refresh_token(queryable) do where( queryable, diff --git a/elixir/apps/domain/lib/domain/billing.ex b/elixir/apps/domain/lib/domain/billing.ex index 14b91f535..8da59ba20 100644 --- a/elixir/apps/domain/lib/domain/billing.ex +++ b/elixir/apps/domain/lib/domain/billing.ex @@ -14,7 +14,7 @@ defmodule Domain.Billing do def init(_opts) do children = [ APIClient, - {Domain.Jobs, Jobs} + Jobs.CheckAccountLimits ] if enabled?() do diff --git a/elixir/apps/domain/lib/domain/billing/jobs.ex b/elixir/apps/domain/lib/domain/billing/jobs/check_account_limits.ex similarity index 92% rename from elixir/apps/domain/lib/domain/billing/jobs.ex rename to elixir/apps/domain/lib/domain/billing/jobs/check_account_limits.ex index 479e5a293..7199125ec 100644 --- a/elixir/apps/domain/lib/domain/billing/jobs.ex +++ b/elixir/apps/domain/lib/domain/billing/jobs/check_account_limits.ex @@ -1,8 +1,13 @@ -defmodule Domain.Billing.Jobs do - use Domain.Jobs.Recurrent, otp_app: :domain +defmodule Domain.Billing.Jobs.CheckAccountLimits do + use Domain.Jobs.Job, + otp_app: :domain, + every: :timer.minutes(5), + executor: Domain.Jobs.Executors.GloballyUnique + alias Domain.{Accounts, Billing, Actors, Clients, Gateways} - every minutes(30), :check_account_limits do + @impl true + def execute(_config) do Accounts.all_active_accounts!() |> Enum.each(fn account -> if Billing.enabled?() and Billing.account_provisioned?(account) do diff --git a/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex b/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex index 234e4ed31..ed5cc74b7 100644 --- a/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex +++ b/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex @@ -173,7 +173,7 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do discovered_nodes_count: count }) - Logger.debug("Found #{count}", module: __MODULE__, nodes: Enum.join(nodes, ", ")) + Logger.debug("Found #{count} nodes", module: __MODULE__, nodes: Enum.join(nodes, ", ")) {:ok, nodes} end diff --git a/elixir/apps/domain/lib/domain/jobs.ex b/elixir/apps/domain/lib/domain/jobs.ex deleted file mode 100644 index f5dc3e21c..000000000 --- a/elixir/apps/domain/lib/domain/jobs.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Domain.Jobs do - @moduledoc """ - This module starts all recurrent job handlers defined by a module - in individual processes and supervises them. - """ - use Supervisor - - def start_link(module) do - Supervisor.start_link(__MODULE__, module) - end - - def init(module) do - config = module.__config__() - - children = - Enum.flat_map(module.__handlers__(), fn {name, interval} -> - handler_config = Keyword.get(config, name, []) - - if Keyword.get(config, :enabled, true) and Keyword.get(handler_config, :enabled, true) do - [ - Supervisor.child_spec( - {Domain.Jobs.Executors.Global, {{module, name}, interval, handler_config}}, - id: {module, name} - ) - ] - else - [] - end - end) - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/elixir/apps/domain/lib/domain/jobs/executors/concurrent.ex b/elixir/apps/domain/lib/domain/jobs/executors/concurrent.ex new file mode 100644 index 000000000..cfc23c9ce --- /dev/null +++ b/elixir/apps/domain/lib/domain/jobs/executors/concurrent.ex @@ -0,0 +1,119 @@ +defmodule Domain.Jobs.Executors.Concurrent do + @moduledoc """ + This module starts a GenServer that executes a callback function + on a given interval on each node that runs the jobs. This means + concurrency control should be implemented in the callback function + itself (eg. by using `SELECT ... FOR UPDATE SKIP LOCKED`) or + by using advisory locks (see `reject_locked/2`). + + If you need globally unique jobs see `Domain.Jobs.Executors.GloballyUnique`. + """ + use GenServer + require Logger + require OpenTelemetry.Tracer + + @doc """ + Initializes the worker state. + """ + @callback state(config :: term()) :: {:ok, state :: term()} + + @doc """ + Executes the callback function with the state created in `c:state/1`. + """ + @callback execute(state :: term()) :: :ok + + def start_link({module, interval, config}) do + GenServer.start_link(__MODULE__, {module, interval, config}) + end + + @impl true + def init({module, interval, config}) do + initial_delay = Keyword.get(config, :initial_delay, 0) + + with {:ok, worker_state} <- module.state(config) do + {:ok, {module, worker_state, interval}, initial_delay} + end + end + + @impl true + def handle_info(:timeout, {_module, _worker_state_, interval} = state) do + :ok = schedule_tick(interval) + {:noreply, state} + end + + @impl true + def handle_info(:tick, {module, worker_state, interval} = state) do + :ok = execute_handler(module, worker_state) + :ok = schedule_tick(interval) + {:noreply, state} + end + + # tick is scheduled by using a timeout message instead of `:timer.send_interval/2`, + # because we don't want jobs to overlap if they take too long to execute + defp schedule_tick(interval) do + _ = Process.send_after(self(), :tick, interval) + :ok + end + + defp execute_handler(module, worker_state) do + job_callback = "#{module}.execute/2" + + attributes = [ + job_runner: __MODULE__, + job_execution_id: Ecto.UUID.generate(), + job_callback: job_callback + ] + + Logger.metadata(attributes) + + OpenTelemetry.Tracer.with_span job_callback, attributes: attributes do + _ = module.execute(worker_state) + end + + :ok + end + + @doc """ + A helper function that acquires an exclusive transaction-level advisory lock for each given row in the given table + and returns only the rows that were successfully locked. + + This function is useful when you want to ensure that only one process is working on a given + row(s) at a time, without using actual row-level locks that can cause deadlocks and timeouts + for long-running transactions (like IdP syncs). + + Execution of this function should be wrapped in a transaction block (eg. `Ecto.Repo.transaction/2`), + the locks are released when the transaction is committed or rolled back. + + ## Implementation notes + + Postgres allows to either use one `bigint` or two-`int` for advisory locks, + we use the latter to avoid contention on a single value between processed tables + by using the oid of the table as as first of the lock arguments. + + The lock is acquired by the `id` (second `int`) of the row but since our ids are UUIDs we also + hash them to fit into `int` range, this opens a possibility of hash collisions but a negligible + trade-off since the chances of a collision is very low and jobs will be restarted anyways only + delaying their execution. + + `mod/2` is used to roll over the hash value to fit into the `int` range since `hashtext/1` return + can change between Postgres versions. + """ + def reject_locked(table, rows) do + ids = Enum.map(rows, & &1.id) + + %Postgrex.Result{rows: not_locked_ids} = + Ecto.Adapters.SQL.query!( + Domain.Repo, + """ + SELECT id + FROM unnest($1::text[]) AS t(id) + WHERE pg_try_advisory_xact_lock(($2::text)::regclass::oid::int, mod(hashtext(t.id), 2147483647)::int) + """, + [ids, table] + ) + + not_locked_ids = Enum.map(not_locked_ids, fn [id] -> id end) + + Enum.filter(rows, fn row -> row.id in not_locked_ids end) + end +end diff --git a/elixir/apps/domain/lib/domain/jobs/executors/global.ex b/elixir/apps/domain/lib/domain/jobs/executors/globally_unique.ex similarity index 74% rename from elixir/apps/domain/lib/domain/jobs/executors/global.ex rename to elixir/apps/domain/lib/domain/jobs/executors/globally_unique.ex index 4052392a8..4beff27a1 100644 --- a/elixir/apps/domain/lib/domain/jobs/executors/global.ex +++ b/elixir/apps/domain/lib/domain/jobs/executors/globally_unique.ex @@ -1,4 +1,4 @@ -defmodule Domain.Jobs.Executors.Global do +defmodule Domain.Jobs.Executors.GloballyUnique do @moduledoc """ This module is an abstraction on top of a GenServer that executes a callback function on a given interval on a globally unique process in an Erlang cluster. @@ -50,13 +50,18 @@ defmodule Domain.Jobs.Executors.Global do require Logger require OpenTelemetry.Tracer - def start_link({{module, function}, interval, config}) do - GenServer.start_link(__MODULE__, {{module, function}, interval, config}) + @doc """ + Executes the callback function with the given configuration. + """ + @callback execute(config :: term()) :: :ok + + def start_link({module, interval, config}) do + GenServer.start_link(__MODULE__, {module, interval, config}) end @impl true - def init({{module, function}, interval, config}) do - name = global_name(module, function) + def init({module, interval, config}) do + name = global_name(module) # `random_notify_name` is used to avoid name conflicts in a cluster during deployments and # network splits, it randomly selects one of the duplicate pids for registration, @@ -66,52 +71,40 @@ defmodule Domain.Jobs.Executors.Global do pid when is_pid(pid) <- :global.whereis_name(name) do # we monitor the leader process so that we start a race to become a new leader when it's down monitor_ref = Process.monitor(pid) - {:ok, {{{module, function}, interval, config}, {:fallback, pid, monitor_ref}}, :hibernate} + {:ok, {{module, interval, config}, {:fallback, pid, monitor_ref}}, :hibernate} else :yes -> - Logger.debug("Recurrent job will be handled on this node", - module: module, - function: function - ) - + Logger.debug("Recurrent job will be handled on this node", module: module) initial_delay = Keyword.get(config, :initial_delay, 0) - {:ok, {{{module, function}, interval, config}, :leader}, initial_delay} + {:ok, {{module, interval, config}, :leader}, initial_delay} :undefined -> - Logger.warning("Recurrent job leader exists but is not yet available", - module: module, - function: function - ) - + Logger.warning("Recurrent job leader exists but is not yet available", module: module) _timer_ref = :timer.sleep(100) init(module) end end @impl true - def handle_info(:timeout, {{{_module, _name}, interval, _config}, :leader} = state) do + def handle_info(:timeout, {{_module, interval, _config}, :leader} = state) do :ok = schedule_tick(interval) {:noreply, state} end @impl true def handle_info( - {:global_name_conflict, {__MODULE__, module, function}}, - {{{module, function}, interval, config}, _leader_or_fallback} = state + {:global_name_conflict, {__MODULE__, module}}, + {{module, interval, config}, _leader_or_fallback} = state ) do - name = global_name(module, function) + name = global_name(module) with pid when is_pid(pid) <- :global.whereis_name(name) do monitor_ref = Process.monitor(pid) - state = {{{module, function}, interval, config}, {:fallback, pid, monitor_ref}} + state = {{module, interval, config}, {:fallback, pid, monitor_ref}} {:noreply, state, :hibernate} else :undefined -> - Logger.warning("Recurrent job name conflict", - module: module, - function: function - ) - + Logger.warning("Recurrent job name conflict", module: module) _timer_ref = :timer.sleep(100) handle_info({:global_name_conflict, module}, state) end @@ -119,7 +112,7 @@ defmodule Domain.Jobs.Executors.Global do def handle_info( {:DOWN, _ref, :process, _pid, reason}, - {{{module, function}, interval, config}, {:fallback, pid, _monitor_ref}} + {{module, interval, config}, {:fallback, pid, _monitor_ref}} ) do # Solves the "Retry Storm" antipattern backoff_with_jitter = :rand.uniform(200) - 1 @@ -127,12 +120,11 @@ defmodule Domain.Jobs.Executors.Global do Logger.info("Recurrent job leader is down", module: module, - function: function, leader_pid: inspect(pid), leader_exit_reason: inspect(reason, pretty: true) ) - case init({{module, function}, interval, config}) do + case init({module, interval, config}) do {:ok, state, :hibernate} -> {:noreply, state, :hibernate} {:ok, state, _initial_delay} -> {:noreply, state, 0} end @@ -143,8 +135,8 @@ defmodule Domain.Jobs.Executors.Global do {:noreply, state} end - def handle_info(:tick, {{{module, function}, interval, config}, :leader} = state) do - :ok = execute_handler(module, function, config) + def handle_info(:tick, {{module, interval, config}, :leader} = state) do + :ok = execute_handler(module, config) :ok = schedule_tick(interval) {:noreply, state} end @@ -156,8 +148,8 @@ defmodule Domain.Jobs.Executors.Global do :ok end - defp execute_handler(module, function, config) do - job_callback = "#{module}.#{function}/1" + defp execute_handler(module, config) do + job_callback = "#{module}.execute/1" attributes = [ job_runner: __MODULE__, @@ -168,11 +160,11 @@ defmodule Domain.Jobs.Executors.Global do Logger.metadata(attributes) OpenTelemetry.Tracer.with_span job_callback, attributes: attributes do - _ = apply(module, function, [config]) + _ = module.execute(config) end :ok end - defp global_name(module, function), do: {__MODULE__, module, function} + defp global_name(module), do: {__MODULE__, module} end diff --git a/elixir/apps/domain/lib/domain/jobs/job.ex b/elixir/apps/domain/lib/domain/jobs/job.ex new file mode 100644 index 000000000..78e7735e4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/jobs/job.ex @@ -0,0 +1,27 @@ +defmodule Domain.Jobs.Job do + defmacro __using__(opts) do + quote bind_quoted: [opts: opts, location: :keep] do + @otp_app Keyword.fetch!(opts, :otp_app) + @interval Keyword.fetch!(opts, :every) + @executor Keyword.fetch!(opts, :executor) + + @behaviour @executor + + def child_spec(_opts) do + config = __config__() + + if Keyword.get(config, :enabled, true) do + Supervisor.child_spec({@executor, {__MODULE__, @interval, config}}, id: __MODULE__) + else + :ignore + end + end + + @doc """ + Returns the configuration for the job that is defined in the application environment. + """ + @spec __config__() :: Keyword.t() + def __config__, do: Domain.Config.get_env(@otp_app, __MODULE__, []) + end + end +end diff --git a/elixir/apps/domain/lib/domain/jobs/recurrent.ex b/elixir/apps/domain/lib/domain/jobs/recurrent.ex deleted file mode 100644 index 253e0f0c3..000000000 --- a/elixir/apps/domain/lib/domain/jobs/recurrent.ex +++ /dev/null @@ -1,56 +0,0 @@ -defmodule Domain.Jobs.Recurrent do - @doc """ - This module provides a DSL to define recurrent jobs that run on an time interval basis. - """ - defmacro __using__(opts) do - quote bind_quoted: [opts: opts] do - import Domain.Jobs.Recurrent - - # Accumulate handlers and define a `__handlers__/0` function to list them - @before_compile Domain.Jobs.Recurrent - Module.register_attribute(__MODULE__, :handlers, accumulate: true) - - # Will read the config from the application environment - @otp_app Keyword.fetch!(opts, :otp_app) - @spec __config__() :: Keyword.t() - def __config__, do: Domain.Config.get_env(@otp_app, __MODULE__, []) - end - end - - defmacro __before_compile__(_env) do - quote do - @spec __handlers__() :: [{atom(), pos_integer()}] - def __handlers__, do: @handlers - end - end - - @doc """ - Defines a code to execute every `interval` milliseconds. - - Is it recommended to use `seconds/1`, `minutes/1` macros to define the interval. - - Behind the hood it defines a function `execute(name, interval, do: ..)` and adds it's name to the - module attribute. - """ - defmacro every(interval, name, do: block) do - quote do - @handlers {unquote(name), unquote(interval)} - def unquote(name)(unquote(Macro.var(:_config, nil))), do: unquote(block) - end - end - - defmacro every(interval, name, config, do: block) do - quote do - @handlers {unquote(name), unquote(interval)} - def unquote(name)(unquote(config)), do: unquote(block) - end - end - - def seconds(num) do - :timer.seconds(num) - end - - def minutes(num) do - :timer.minutes(num) - end -end diff --git a/elixir/apps/domain/lib/domain/telemetry.ex b/elixir/apps/domain/lib/domain/telemetry.ex index 277caed69..d62325e1f 100644 --- a/elixir/apps/domain/lib/domain/telemetry.ex +++ b/elixir/apps/domain/lib/domain/telemetry.ex @@ -77,7 +77,18 @@ defmodule Domain.Telemetry do # Application metrics last_value("domain.relays.online_relays_count"), - last_value("domain.metrics.discovered_nodes_count") + last_value("domain.metrics.discovered_nodes_count"), + + ## Directory Syncs + summary("domain.directory_sync.data_fetch_total_time", + tags: [:account_id, :provider_id, :provider_adapter] + ), + summary("domain.directory_sync.db_operations_total_time", + tags: [:account_id, :provider_id, :provider_adapter] + ), + distribution("domain.directory_sync.total_time", + tags: [:account_id, :provider_id, :provider_adapter] + ) ] end diff --git a/elixir/apps/domain/lib/domain/tokens.ex b/elixir/apps/domain/lib/domain/tokens.ex index 48b146bcc..5bd2840c3 100644 --- a/elixir/apps/domain/lib/domain/tokens.ex +++ b/elixir/apps/domain/lib/domain/tokens.ex @@ -12,7 +12,7 @@ defmodule Domain.Tokens do @impl true def init(_init_arg) do children = [ - {Domain.Jobs, Jobs} + Jobs.DeleteExpiredTokens ] Supervisor.init(children, strategy: :one_for_one) diff --git a/elixir/apps/domain/lib/domain/tokens/jobs.ex b/elixir/apps/domain/lib/domain/tokens/jobs.ex deleted file mode 100644 index e29f69449..000000000 --- a/elixir/apps/domain/lib/domain/tokens/jobs.ex +++ /dev/null @@ -1,10 +0,0 @@ -defmodule Domain.Tokens.Jobs do - use Domain.Jobs.Recurrent, otp_app: :domain - alias Domain.Tokens - require Logger - - every minutes(5), :delete_expired_tokens do - {:ok, _count} = Tokens.delete_expired_tokens() - :ok - end -end diff --git a/elixir/apps/domain/lib/domain/tokens/jobs/delete_expired_tokens.ex b/elixir/apps/domain/lib/domain/tokens/jobs/delete_expired_tokens.ex new file mode 100644 index 000000000..e67b18159 --- /dev/null +++ b/elixir/apps/domain/lib/domain/tokens/jobs/delete_expired_tokens.ex @@ -0,0 +1,15 @@ +defmodule Domain.Tokens.Jobs.DeleteExpiredTokens do + use Domain.Jobs.Job, + otp_app: :domain, + every: :timer.minutes(5), + executor: Domain.Jobs.Executors.GloballyUnique + + alias Domain.Tokens + require Logger + + @impl true + def execute(_config) do + {:ok, _count} = Tokens.delete_expired_tokens() + :ok + end +end diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index 900c4c2fa..80d0180c2 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -585,7 +585,7 @@ defmodule Domain.ActorsTest do end end - describe "sync_provider_groups_multi/2" do + describe "sync_provider_groups/2" do setup do account = Fixtures.Accounts.create_account() @@ -601,15 +601,13 @@ defmodule Domain.ActorsTest do %{"name" => "OrgUnit:Engineering", "provider_identifier" => "OU:OU_ID1"} ] - multi = sync_provider_groups_multi(provider, attrs_list) - assert {:ok, %{ - plan_groups: {upsert, []}, - delete_groups: [], - upsert_groups: [_group1, _group2], + plan: {upsert, []}, + deleted: [], + upserted: [_group1, _group2], group_ids_by_provider_identifier: group_ids_by_provider_identifier - }} = Repo.transaction(multi) + }} = sync_provider_groups(provider, attrs_list) assert Enum.all?(["G:GROUP_ID1", "OU:OU_ID1"], &(&1 in upsert)) groups = Repo.all(Actors.Group) @@ -651,15 +649,13 @@ defmodule Domain.ActorsTest do %{"name" => "OrgUnit:Engineering", "provider_identifier" => "OU:OU_ID1"} ] - multi = sync_provider_groups_multi(provider, attrs_list) - assert {:ok, %{ - plan_groups: {upsert, []}, - delete_groups: [], - upsert_groups: [_group1, _group2], + plan: {upsert, []}, + deleted: [], + upserted: [_group1, _group2], group_ids_by_provider_identifier: group_ids_by_provider_identifier - }} = Repo.transaction(multi) + }} = sync_provider_groups(provider, attrs_list) assert Enum.all?(["G:GROUP_ID1", "OU:OU_ID1"], &(&1 in upsert)) assert Repo.aggregate(Actors.Group, :count) == 2 @@ -719,16 +715,14 @@ defmodule Domain.ActorsTest do attrs_list = [] - multi = sync_provider_groups_multi(provider, attrs_list) - assert {:ok, %{ groups: [_group1, _group2], - plan_groups: {[], delete}, - delete_groups: [deleted_group1, deleted_group2], - upsert_groups: [], + plan: {[], delete}, + deleted: [deleted_group1, deleted_group2], + upserted: [], group_ids_by_provider_identifier: group_ids_by_provider_identifier - }} = Repo.transaction(multi) + }} = sync_provider_groups(provider, attrs_list) assert Enum.all?(["G:GROUP_ID1", "OU:OU_ID1"], &(&1 in delete)) assert deleted_group1.provider_identifier in ["G:GROUP_ID1", "OU:OU_ID1"] @@ -772,21 +766,19 @@ defmodule Domain.ActorsTest do attrs_list = [] - multi = sync_provider_groups_multi(provider, attrs_list) - - assert Repo.transaction(multi) == + assert sync_provider_groups(provider, attrs_list) == {:ok, %{ groups: [], - plan_groups: {[], []}, - delete_groups: [], - upsert_groups: [], + plan: {[], []}, + deleted: [], + upserted: [], group_ids_by_provider_identifier: %{} }} end end - describe "sync_provider_memberships_multi/2" do + describe "sync_provider_memberships/2" do setup do account = Fixtures.Accounts.create_account() @@ -870,12 +862,6 @@ defmodule Domain.ActorsTest do group2.provider_identifier => group2.id } - multi = - Ecto.Multi.new() - |> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier) - |> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier) - |> sync_provider_memberships_multi(provider, tuples_list) - :ok = subscribe_to_membership_updates_for_actor(actor1) :ok = subscribe_to_membership_updates_for_actor(actor2) :ok = Domain.Policies.subscribe_to_events_for_actor(actor1) @@ -885,10 +871,16 @@ defmodule Domain.ActorsTest do assert {:ok, %{ - plan_memberships: {insert, []}, - delete_memberships: {0, nil}, - insert_memberships: [_membership1, _membership2] - }} = Repo.transaction(multi) + plan: {insert, []}, + deleted_stats: {0, nil}, + inserted: [_membership1, _membership2] + }} = + sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + provider, + tuples_list + ) assert {group1.id, identity1.actor_id} in insert assert {group2.id, identity2.actor_id} in insert @@ -941,12 +933,6 @@ defmodule Domain.ActorsTest do group2.provider_identifier => group2.id } - multi = - Ecto.Multi.new() - |> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier) - |> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier) - |> sync_provider_memberships_multi(provider, tuples_list) - :ok = subscribe_to_membership_updates_for_actor(identity1.actor_id) :ok = subscribe_to_membership_updates_for_actor(identity2.actor_id) :ok = Domain.Policies.subscribe_to_events_for_actor(identity1.actor_id) @@ -956,10 +942,16 @@ defmodule Domain.ActorsTest do assert {:ok, %{ - plan_memberships: {[], []}, - delete_memberships: {0, nil}, - insert_memberships: [] - }} = Repo.transaction(multi) + plan: {[], []}, + deleted_stats: {0, nil}, + inserted: [] + }} = + sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + provider, + tuples_list + ) assert Repo.aggregate(Actors.Membership, :count) == 2 assert Repo.aggregate(Actors.Membership.Query.all(), :count) == 2 @@ -1021,12 +1013,6 @@ defmodule Domain.ActorsTest do group2.provider_identifier => group2.id } - multi = - Ecto.Multi.new() - |> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier) - |> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier) - |> sync_provider_memberships_multi(provider, tuples_list) - :ok = subscribe_to_membership_updates_for_actor(identity1.actor_id) :ok = subscribe_to_membership_updates_for_actor(identity2.actor_id) :ok = Domain.Policies.subscribe_to_events_for_actor(identity1.actor_id) @@ -1037,10 +1023,16 @@ defmodule Domain.ActorsTest do assert {:ok, %{ - plan_memberships: {[], delete}, - delete_memberships: {2, nil}, - insert_memberships: [] - }} = Repo.transaction(multi) + plan: {[], delete}, + deleted_stats: {2, nil}, + inserted: [] + }} = + sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + provider, + tuples_list + ) assert {group1.id, identity1.actor_id} in delete assert {group2.id, identity2.actor_id} in delete @@ -1095,18 +1087,18 @@ defmodule Domain.ActorsTest do group1.provider_identifier => group1.id } - multi = - Ecto.Multi.new() - |> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier) - |> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier) - |> sync_provider_memberships_multi(provider, tuples_list) - assert {:ok, %{ - plan_memberships: {[], delete}, - insert_memberships: [], - delete_memberships: {1, nil} - }} = Repo.transaction(multi) + plan: {[], delete}, + inserted: [], + deleted_stats: {1, nil} + }} = + sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + provider, + tuples_list + ) assert delete == [{group2.id, identity2.actor_id}] @@ -1136,18 +1128,18 @@ defmodule Domain.ActorsTest do group2.provider_identifier => group2.id } - multi = - Ecto.Multi.new() - |> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier) - |> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier) - |> sync_provider_memberships_multi(provider, tuples_list) - assert {:ok, %{ - plan_memberships: {[], []}, - delete_memberships: {0, nil}, - insert_memberships: [] - }} = Repo.transaction(multi) + plan: {[], []}, + deleted_stats: {0, nil}, + inserted: [] + }} = + sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + provider, + tuples_list + ) end test "deletes actors that are not processed by identity sync", %{ @@ -1182,18 +1174,18 @@ defmodule Domain.ActorsTest do group2.provider_identifier => group2.id } - multi = - Ecto.Multi.new() - |> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier) - |> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier) - |> sync_provider_memberships_multi(provider, tuples_list) - assert {:ok, %{ - plan_memberships: {[], delete}, - delete_memberships: {2, nil}, - insert_memberships: [] - }} = Repo.transaction(multi) + plan: {[], delete}, + deleted_stats: {2, nil}, + inserted: [] + }} = + sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + provider, + tuples_list + ) assert {group1.id, identity1.actor_id} in delete assert {group2.id, identity2.actor_id} in delete @@ -1231,18 +1223,18 @@ defmodule Domain.ActorsTest do group_ids_by_provider_identifier = %{} - multi = - Ecto.Multi.new() - |> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier) - |> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier) - |> sync_provider_memberships_multi(provider, tuples_list) - assert {:ok, %{ - plan_memberships: {[], delete}, - delete_memberships: {2, nil}, - insert_memberships: [] - }} = Repo.transaction(multi) + plan: {[], delete}, + deleted_stats: {2, nil}, + inserted: [] + }} = + sync_provider_memberships( + actor_ids_by_provider_identifier, + group_ids_by_provider_identifier, + provider, + tuples_list + ) assert {group1.id, identity1.actor_id} in delete assert {group2.id, identity2.actor_id} in delete diff --git a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/refresh_access_tokens_test.exs b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/refresh_access_tokens_test.exs new file mode 100644 index 000000000..7bdb1603a --- /dev/null +++ b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/refresh_access_tokens_test.exs @@ -0,0 +1,81 @@ +defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.RefreshAccessTokensTest do + use Domain.DataCase, async: true + import Domain.Auth.Adapters.GoogleWorkspace.Jobs.RefreshAccessTokens + + describe "execute/1" do + setup do + account = Fixtures.Accounts.create_account() + + {provider, bypass} = + Fixtures.Auth.start_and_create_google_workspace_provider(account: account) + + provider = + Domain.Fixture.update!(provider, %{ + adapter_state: %{ + "access_token" => "OIDC_ACCESS_TOKEN", + "refresh_token" => "OIDC_REFRESH_TOKEN", + "expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute), + "claims" => "openid email profile offline_access" + } + }) + + identity = Fixtures.Auth.create_identity(account: account, provider: provider) + + %{ + bypass: bypass, + account: account, + provider: provider, + identity: identity + } + end + + test "refreshes the access token", %{ + provider: provider, + identity: identity, + bypass: bypass + } do + {token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity) + + Mocks.OpenIDConnect.expect_refresh_token(bypass, %{ + "token_type" => "Bearer", + "id_token" => token, + "access_token" => "MY_ACCESS_TOKEN", + "refresh_token" => "OTHER_REFRESH_TOKEN", + "expires_in" => nil + }) + + Mocks.OpenIDConnect.expect_userinfo(bypass) + + assert execute(%{}) == :ok + + provider = Repo.get!(Domain.Auth.Provider, provider.id) + + assert %{ + "access_token" => "MY_ACCESS_TOKEN", + "claims" => ^claims, + "expires_at" => expires_at, + "refresh_token" => "OIDC_REFRESH_TOKEN", + "userinfo" => %{ + "email" => "ada@example.com", + "email_verified" => true, + "family_name" => "Lovelace", + "given_name" => "Ada", + "locale" => "en", + "name" => "Ada Lovelace", + "picture" => + "https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg", + "sub" => "353690423699814251281" + } + } = provider.adapter_state + + assert expires_at + end + + test "does not crash when endpoint it not available", %{ + bypass: bypass + } do + Bypass.down(bypass) + assert execute(%{}) == :ok + end + end +end diff --git a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs_test.exs b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs similarity index 89% rename from elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs_test.exs rename to elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs index 3ec5c4b5c..c270c5bd1 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs @@ -1,87 +1,10 @@ -defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do +defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do use Domain.DataCase, async: true alias Domain.{Auth, Actors} alias Domain.Mocks.GoogleWorkspaceDirectory - import Domain.Auth.Adapters.GoogleWorkspace.Jobs + import Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectory - describe "refresh_access_tokens/1" do - setup do - account = Fixtures.Accounts.create_account() - - {provider, bypass} = - Fixtures.Auth.start_and_create_google_workspace_provider(account: account) - - provider = - Domain.Fixture.update!(provider, %{ - adapter_state: %{ - "access_token" => "OIDC_ACCESS_TOKEN", - "refresh_token" => "OIDC_REFRESH_TOKEN", - "expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute), - "claims" => "openid email profile offline_access" - } - }) - - identity = Fixtures.Auth.create_identity(account: account, provider: provider) - - %{ - bypass: bypass, - account: account, - provider: provider, - identity: identity - } - end - - test "refreshes the access token", %{ - provider: provider, - identity: identity, - bypass: bypass - } do - {token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity) - - Mocks.OpenIDConnect.expect_refresh_token(bypass, %{ - "token_type" => "Bearer", - "id_token" => token, - "access_token" => "MY_ACCESS_TOKEN", - "refresh_token" => "OTHER_REFRESH_TOKEN", - "expires_in" => nil - }) - - Mocks.OpenIDConnect.expect_userinfo(bypass) - - assert refresh_access_tokens(%{}) == :ok - - provider = Repo.get!(Domain.Auth.Provider, provider.id) - - assert %{ - "access_token" => "MY_ACCESS_TOKEN", - "claims" => ^claims, - "expires_at" => expires_at, - "refresh_token" => "OIDC_REFRESH_TOKEN", - "userinfo" => %{ - "email" => "ada@example.com", - "email_verified" => true, - "family_name" => "Lovelace", - "given_name" => "Ada", - "locale" => "en", - "name" => "Ada Lovelace", - "picture" => - "https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg", - "sub" => "353690423699814251281" - } - } = provider.adapter_state - - assert expires_at - end - - test "does not crash when endpoint it not available", %{ - bypass: bypass - } do - Bypass.down(bypass) - assert refresh_access_tokens(%{}) == :ok - end - end - - describe "sync_directory/1" do + describe "execute/1" do setup do account = Fixtures.Accounts.create_account() @@ -98,7 +21,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do test "returns error when IdP sync is not enabled", %{account: account, provider: provider} do {:ok, _account} = Domain.Accounts.update_account(account, %{features: %{idp_sync: false}}) - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at @@ -254,7 +178,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do GoogleWorkspaceDirectory.mock_group_members_list_endpoint(bypass, group["id"], members) end) - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok groups = Actors.Group |> Repo.all() assert length(groups) == 2 @@ -300,7 +225,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do Bypass.down(bypass) GoogleWorkspaceDirectory.override_endpoint_url("http://localhost:#{bypass.port}/") - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert Repo.aggregate(Actors.Group, :count) == 0 end @@ -343,7 +269,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do GoogleWorkspaceDirectory.mock_organization_units_list_endpoint(bypass, []) GoogleWorkspaceDirectory.mock_users_list_endpoint(bypass, users) - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_identity = Repo.get(Domain.Auth.Identity, identity.id) @@ -574,7 +501,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do GoogleWorkspaceDirectory.mock_group_members_list_endpoint(bypass, "GROUP_ID1", two_members) GoogleWorkspaceDirectory.mock_group_members_list_endpoint(bypass, "GROUP_ID2", one_member) - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_group = Repo.get(Domain.Actors.Group, group.id) assert updated_group.name == "Group:Infrastructure" @@ -704,7 +632,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do end) end - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at @@ -725,7 +654,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do end) end - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at @@ -791,7 +721,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do end) end - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at diff --git a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/refresh_access_tokens_test.exs b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/refresh_access_tokens_test.exs new file mode 100644 index 000000000..11e6186ed --- /dev/null +++ b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/refresh_access_tokens_test.exs @@ -0,0 +1,81 @@ +defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.RefreshAccessTokensTest do + use Domain.DataCase, async: true + import Domain.Auth.Adapters.MicrosoftEntra.Jobs.RefreshAccessTokens + + describe "execute/1" do + setup do + account = Fixtures.Accounts.create_account() + + {provider, bypass} = + Fixtures.Auth.start_and_create_microsoft_entra_provider(account: account) + + provider = + Domain.Fixture.update!(provider, %{ + adapter_state: %{ + "access_token" => "OIDC_ACCESS_TOKEN", + "refresh_token" => "OIDC_REFRESH_TOKEN", + "expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute), + "claims" => "openid email profile offline_access" + } + }) + + identity = Fixtures.Auth.create_identity(account: account, provider: provider) + + %{ + bypass: bypass, + account: account, + provider: provider, + identity: identity + } + end + + test "refreshes the access token", %{ + provider: provider, + identity: identity, + bypass: bypass + } do + {token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity) + + Mocks.OpenIDConnect.expect_refresh_token(bypass, %{ + "token_type" => "Bearer", + "id_token" => token, + "access_token" => "MY_ACCESS_TOKEN", + "refresh_token" => "OTHER_REFRESH_TOKEN", + "expires_in" => nil + }) + + Mocks.OpenIDConnect.expect_userinfo(bypass) + + assert execute(%{}) == :ok + + provider = Repo.get!(Domain.Auth.Provider, provider.id) + + assert %{ + "access_token" => "MY_ACCESS_TOKEN", + "claims" => ^claims, + "expires_at" => expires_at, + "refresh_token" => "OIDC_REFRESH_TOKEN", + "userinfo" => %{ + "email" => "ada@example.com", + "email_verified" => true, + "family_name" => "Lovelace", + "given_name" => "Ada", + "locale" => "en", + "name" => "Ada Lovelace", + "picture" => + "https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg", + "sub" => "353690423699814251281" + } + } = provider.adapter_state + + assert expires_at + end + + test "does not crash when endpoint it not available", %{ + bypass: bypass + } do + Bypass.down(bypass) + assert execute(%{}) == :ok + end + end +end diff --git a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs_test.exs b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs similarity index 84% rename from elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs_test.exs rename to elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs index 9c2e8ed16..43cbc02c2 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs @@ -1,87 +1,10 @@ -defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do +defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do use Domain.DataCase, async: true alias Domain.{Auth, Actors} alias Domain.Mocks.MicrosoftEntraDirectory - import Domain.Auth.Adapters.MicrosoftEntra.Jobs + import Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectory - describe "refresh_access_tokens/1" do - setup do - account = Fixtures.Accounts.create_account() - - {provider, bypass} = - Fixtures.Auth.start_and_create_microsoft_entra_provider(account: account) - - provider = - Domain.Fixture.update!(provider, %{ - adapter_state: %{ - "access_token" => "OIDC_ACCESS_TOKEN", - "refresh_token" => "OIDC_REFRESH_TOKEN", - "expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute), - "claims" => "openid email profile offline_access" - } - }) - - identity = Fixtures.Auth.create_identity(account: account, provider: provider) - - %{ - bypass: bypass, - account: account, - provider: provider, - identity: identity - } - end - - test "refreshes the access token", %{ - provider: provider, - identity: identity, - bypass: bypass - } do - {token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity) - - Mocks.OpenIDConnect.expect_refresh_token(bypass, %{ - "token_type" => "Bearer", - "id_token" => token, - "access_token" => "MY_ACCESS_TOKEN", - "refresh_token" => "OTHER_REFRESH_TOKEN", - "expires_in" => nil - }) - - Mocks.OpenIDConnect.expect_userinfo(bypass) - - assert refresh_access_tokens(%{}) == :ok - - provider = Repo.get!(Domain.Auth.Provider, provider.id) - - assert %{ - "access_token" => "MY_ACCESS_TOKEN", - "claims" => ^claims, - "expires_at" => expires_at, - "refresh_token" => "OIDC_REFRESH_TOKEN", - "userinfo" => %{ - "email" => "ada@example.com", - "email_verified" => true, - "family_name" => "Lovelace", - "given_name" => "Ada", - "locale" => "en", - "name" => "Ada Lovelace", - "picture" => - "https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg", - "sub" => "353690423699814251281" - } - } = provider.adapter_state - - assert expires_at - end - - test "does not crash when endpoint it not available", %{ - bypass: bypass - } do - Bypass.down(bypass) - assert refresh_access_tokens(%{}) == :ok - end - end - - describe "sync_directory/1" do + describe "execute/1" do setup do account = Fixtures.Accounts.create_account() @@ -98,7 +21,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do test "returns error when IdP sync is not enabled", %{account: account, provider: provider} do {:ok, _account} = Domain.Accounts.update_account(account, %{features: %{idp_sync: false}}) - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at @@ -160,7 +84,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do MicrosoftEntraDirectory.mock_group_members_list_endpoint(bypass, group["id"], members) end) - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok groups = Actors.Group |> Repo.all() assert length(groups) == 2 @@ -206,7 +131,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do Bypass.down(bypass) MicrosoftEntraDirectory.override_endpoint_url("http://localhost:#{bypass.port}/") - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert Repo.aggregate(Actors.Group, :count) == 0 end @@ -240,7 +166,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do MicrosoftEntraDirectory.mock_groups_list_endpoint(bypass, []) MicrosoftEntraDirectory.mock_users_list_endpoint(bypass, users) - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_identity = Repo.get(Domain.Auth.Identity, identity.id) @@ -407,7 +334,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do one_member ) - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_group = Repo.get(Domain.Actors.Group, group.id) assert updated_group.name == "Group:All" @@ -506,7 +434,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do end) end - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at @@ -522,7 +451,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do end) end - assert sync_directory(%{}) == :ok + {:ok, pid} = Task.Supervisor.start_link() + assert execute(%{task_supervisor: pid}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at diff --git a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/refresh_access_tokens_test.exs b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/refresh_access_tokens_test.exs new file mode 100644 index 000000000..4bd60ece9 --- /dev/null +++ b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/refresh_access_tokens_test.exs @@ -0,0 +1,81 @@ +defmodule Domain.Auth.Adapters.Okta.Jobs.RefreshAccessTokensTest do + use Domain.DataCase, async: true + import Domain.Auth.Adapters.Okta.Jobs.RefreshAccessTokens + + describe "execute/1" do + setup do + account = Fixtures.Accounts.create_account() + + {provider, bypass} = + Fixtures.Auth.start_and_create_okta_provider(account: account) + + provider = + Domain.Fixture.update!(provider, %{ + adapter_state: %{ + "access_token" => "OIDC_ACCESS_TOKEN", + "refresh_token" => "OIDC_REFRESH_TOKEN", + "expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute), + "claims" => "openid email profile offline_access" + } + }) + + identity = Fixtures.Auth.create_identity(account: account, provider: provider) + + %{ + bypass: bypass, + account: account, + provider: provider, + identity: identity + } + end + + test "refreshes the access token", %{ + provider: provider, + identity: identity, + bypass: bypass + } do + {token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity) + + Mocks.OpenIDConnect.expect_refresh_token(bypass, %{ + "token_type" => "Bearer", + "id_token" => token, + "access_token" => "MY_ACCESS_TOKEN", + "refresh_token" => "OTHER_REFRESH_TOKEN", + "expires_in" => nil + }) + + Mocks.OpenIDConnect.expect_userinfo(bypass) + + assert execute(%{}) == :ok + + provider = Repo.get!(Domain.Auth.Provider, provider.id) + + assert %{ + "access_token" => "MY_ACCESS_TOKEN", + "claims" => ^claims, + "expires_at" => expires_at, + "refresh_token" => "OIDC_REFRESH_TOKEN", + "userinfo" => %{ + "email" => "ada@example.com", + "email_verified" => true, + "family_name" => "Lovelace", + "given_name" => "Ada", + "locale" => "en", + "name" => "Ada Lovelace", + "picture" => + "https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg", + "sub" => "353690423699814251281" + } + } = provider.adapter_state + + assert expires_at + end + + test "does not crash when endpoint is not available", %{ + bypass: bypass + } do + Bypass.down(bypass) + assert execute(%{}) == :ok + end + end +end diff --git a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs_test.exs b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory.exs similarity index 90% rename from elixir/apps/domain/test/domain/auth/adapters/okta/jobs_test.exs rename to elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory.exs index 3e695f1e3..2dc23ef8f 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory.exs @@ -1,87 +1,10 @@ -defmodule Domain.Auth.Adapters.Okta.JobsTest do +defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do use Domain.DataCase, async: true alias Domain.{Auth, Actors} alias Domain.Mocks.OktaDirectory - import Domain.Auth.Adapters.Okta.Jobs + import Domain.Auth.Adapters.Okta.Jobs.SyncDirectory - describe "refresh_access_tokens/1" do - setup do - account = Fixtures.Accounts.create_account() - - {provider, bypass} = - Fixtures.Auth.start_and_create_okta_provider(account: account) - - provider = - Domain.Fixture.update!(provider, %{ - adapter_state: %{ - "access_token" => "OIDC_ACCESS_TOKEN", - "refresh_token" => "OIDC_REFRESH_TOKEN", - "expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute), - "claims" => "openid email profile offline_access" - } - }) - - identity = Fixtures.Auth.create_identity(account: account, provider: provider) - - %{ - bypass: bypass, - account: account, - provider: provider, - identity: identity - } - end - - test "refreshes the access token", %{ - provider: provider, - identity: identity, - bypass: bypass - } do - {token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity) - - Mocks.OpenIDConnect.expect_refresh_token(bypass, %{ - "token_type" => "Bearer", - "id_token" => token, - "access_token" => "MY_ACCESS_TOKEN", - "refresh_token" => "OTHER_REFRESH_TOKEN", - "expires_in" => nil - }) - - Mocks.OpenIDConnect.expect_userinfo(bypass) - - assert refresh_access_tokens(%{}) == :ok - - provider = Repo.get!(Domain.Auth.Provider, provider.id) - - assert %{ - "access_token" => "MY_ACCESS_TOKEN", - "claims" => ^claims, - "expires_at" => expires_at, - "refresh_token" => "OIDC_REFRESH_TOKEN", - "userinfo" => %{ - "email" => "ada@example.com", - "email_verified" => true, - "family_name" => "Lovelace", - "given_name" => "Ada", - "locale" => "en", - "name" => "Ada Lovelace", - "picture" => - "https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg", - "sub" => "353690423699814251281" - } - } = provider.adapter_state - - assert expires_at - end - - test "does not crash when endpoint is not available", %{ - bypass: bypass - } do - Bypass.down(bypass) - assert refresh_access_tokens(%{}) == :ok - end - end - - describe "sync_directory/1" do + describe "execute/1" do setup do account = Fixtures.Accounts.create_account() @@ -306,7 +229,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do OktaDirectory.mock_group_members_list_endpoint(bypass, group["id"], members) end) - assert sync_directory(%{}) == :ok + assert execute(%{}) == :ok groups = Actors.Group |> Repo.all() assert length(groups) == 2 @@ -350,7 +273,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do test "does not crash on endpoint errors", %{bypass: bypass} do Bypass.down(bypass) - assert sync_directory(%{}) == :ok + assert execute(%{}) == :ok assert Repo.aggregate(Actors.Group, :count) == 0 end @@ -400,7 +323,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do OktaDirectory.mock_groups_list_endpoint(bypass, []) OktaDirectory.mock_users_list_endpoint(bypass, users) - assert sync_directory(%{}) == :ok + assert execute(%{}) == :ok assert updated_identity = Repo.get(Domain.Auth.Identity, identity.id) @@ -729,7 +652,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do one_member ) - assert sync_directory(%{}) == :ok + assert execute(%{}) == :ok assert updated_group = Repo.get(Domain.Actors.Group, group.id) assert updated_group.name == "Group:Engineering" @@ -821,7 +744,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do end) end - assert sync_directory(%{}) == :ok + assert execute(%{}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at @@ -837,7 +760,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do end) end - assert sync_directory(%{}) == :ok + assert execute(%{}) == :ok assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id) refute updated_provider.last_synced_at diff --git a/elixir/apps/domain/test/domain/auth_test.exs b/elixir/apps/domain/test/domain/auth_test.exs index eaf49b055..12c1f5be7 100644 --- a/elixir/apps/domain/test/domain/auth_test.exs +++ b/elixir/apps/domain/test/domain/auth_test.exs @@ -1513,7 +1513,7 @@ defmodule Domain.AuthTest do end end - describe "sync_provider_identities_multi/2" do + describe "sync_provider_identities/2" do setup do account = Fixtures.Accounts.create_account() @@ -1544,16 +1544,15 @@ defmodule Domain.AuthTest do provider_identifiers = Enum.map(attrs_list, & &1["provider_identifier"]) actor_names = Enum.map(attrs_list, & &1["actor"]["name"]) - multi = sync_provider_identities_multi(provider, attrs_list) - assert {:ok, %{ identities: [], - plan_identities: {insert, [], []}, - insert_identities: [_actor1, _actor2], - delete_identities: [], + plan: {insert, [], []}, + insert: [_actor1, _actor2], + update: [], + delete: [], actor_ids_by_provider_identifier: actor_ids_by_provider_identifier - }} = Repo.transaction(multi) + }} = sync_provider_identities(provider, attrs_list) assert Enum.all?(provider_identifiers, &(&1 in insert)) @@ -1608,16 +1607,15 @@ defmodule Domain.AuthTest do } ] - multi = sync_provider_identities_multi(provider, attrs_list) - assert {:ok, %{ identities: [_identity1, _identity2], - plan_identities: {[], update, []}, - delete_identities: [], - insert_identities: [], + plan: {[], update, []}, + delete: [], + update: [_updated_identity1, _updated_identity2], + insert: [], actor_ids_by_provider_identifier: actor_ids_by_provider_identifier - }} = Repo.transaction(multi) + }} = sync_provider_identities(provider, attrs_list) assert length(update) == 2 assert identity1.provider_identifier in update @@ -1659,16 +1657,14 @@ defmodule Domain.AuthTest do } ] - multi = sync_provider_identities_multi(provider, attrs_list) - assert {:ok, %{ identities: [fetched_identity], - plan_identities: {[], ["USER_ID1"], []}, - delete_identities: [], - insert_identities: [], + plan: {[], ["USER_ID1"], []}, + delete: [], + insert: [], actor_ids_by_provider_identifier: actor_ids_by_provider_identifier - }} = Repo.transaction(multi) + }} = sync_provider_identities(provider, attrs_list) assert fetched_identity.actor_id == identity.actor_id assert actor_ids_by_provider_identifier == %{"USER_ID1" => identity.actor_id} @@ -1695,16 +1691,15 @@ defmodule Domain.AuthTest do |> Fixtures.Auth.delete_identity() attrs_list = [] - multi = sync_provider_identities_multi(provider, attrs_list) assert {:ok, %{ identities: [fetched_identity], - plan_identities: {[], [], []}, - delete_identities: [], - insert_identities: [], + plan: {[], [], []}, + delete: [], + insert: [], actor_ids_by_provider_identifier: %{} - }} = Repo.transaction(multi) + }} = sync_provider_identities(provider, attrs_list) assert fetched_identity.id == identity.id @@ -1757,16 +1752,14 @@ defmodule Domain.AuthTest do attrs_list = [] - multi = sync_provider_identities_multi(provider, attrs_list) - assert {:ok, %{ identities: [_identity1, _identity2], - plan_identities: {[], [], delete}, - delete_identities: [deleted_identity1, deleted_identity2], - insert_identities: [], + plan: {[], [], delete}, + delete: [deleted_identity1, deleted_identity2], + insert: [], actor_ids_by_provider_identifier: actor_ids_by_provider_identifier - }} = Repo.transaction(multi) + }} = sync_provider_identities(provider, attrs_list) assert Enum.all?(provider_identifiers, &(&1 in delete)) assert deleted_identity1.provider_identifier in delete @@ -1805,18 +1798,15 @@ defmodule Domain.AuthTest do attrs_list = [] - multi = sync_provider_identities_multi(provider, attrs_list) - - assert Repo.transaction(multi) == + assert sync_provider_identities(provider, attrs_list) == {:ok, %{ identities: [], - plan_identities: {[], [], []}, - delete_identities: [], - insert_identities: [], - update_identities_and_actors: [], - actor_ids_by_provider_identifier: %{}, - recalculate_dynamic_groups: [] + plan: {[], [], []}, + delete: [], + update: [], + insert: [], + actor_ids_by_provider_identifier: %{} }} end @@ -1830,9 +1820,7 @@ defmodule Domain.AuthTest do } ] - multi = sync_provider_identities_multi(provider, attrs_list) - - assert {:error, :insert_identities, changeset, _effects_so_far} = Repo.transaction(multi) + assert {:error, changeset} = sync_provider_identities(provider, attrs_list) assert errors_on(changeset) == %{ actor: %{ @@ -1876,16 +1864,14 @@ defmodule Domain.AuthTest do } ] - multi = sync_provider_identities_multi(provider, attrs_list) - assert {:ok, %{ identities: [_identity1, _identity2], - plan_identities: {[], update, []}, - delete_identities: [], - insert_identities: [], + plan: {[], update, []}, + delete: [], + insert: [], actor_ids_by_provider_identifier: actor_ids_by_provider_identifier - }} = Repo.transaction(multi) + }} = sync_provider_identities(provider, attrs_list) assert length(update) == 2 assert update == ["USER_ID1", "USER_ID1"] diff --git a/elixir/apps/domain/test/domain/billing/jobs_test.exs b/elixir/apps/domain/test/domain/billing/jobs/check_account_limits_test.exs similarity index 91% rename from elixir/apps/domain/test/domain/billing/jobs_test.exs rename to elixir/apps/domain/test/domain/billing/jobs/check_account_limits_test.exs index 8699c3b65..605c03d8d 100644 --- a/elixir/apps/domain/test/domain/billing/jobs_test.exs +++ b/elixir/apps/domain/test/domain/billing/jobs/check_account_limits_test.exs @@ -1,8 +1,8 @@ -defmodule Domain.Billing.JobsTest do +defmodule Domain.Billing.Jobs.CheckAccountLimitsTest do use Domain.DataCase, async: true - import Domain.Billing.Jobs + import Domain.Billing.Jobs.CheckAccountLimits - describe "check_account_limits/1" do + describe "execute/1" do setup do account = Fixtures.Accounts.create_account( @@ -22,7 +22,7 @@ defmodule Domain.Billing.JobsTest do test "does nothing when limits are not violated", %{ account: account } do - assert check_account_limits(%{}) == :ok + assert execute(%{}) == :ok account = Repo.get!(Domain.Accounts.Account, account.id) refute account.warning @@ -55,7 +55,7 @@ defmodule Domain.Billing.JobsTest do } }) - assert check_account_limits(%{}) == :ok + assert execute(%{}) == :ok account = Repo.get!(Domain.Accounts.Account, account.id) diff --git a/elixir/apps/domain/test/domain/jobs/executors/concurrent_test.exs b/elixir/apps/domain/test/domain/jobs/executors/concurrent_test.exs new file mode 100644 index 000000000..7e8453207 --- /dev/null +++ b/elixir/apps/domain/test/domain/jobs/executors/concurrent_test.exs @@ -0,0 +1,84 @@ +defmodule Domain.Jobs.Executors.ConcurrentTest do + use Domain.DataCase, async: true + alias Domain.Fixtures + import Domain.Jobs.Executors.Concurrent + + def state(config) do + {:ok, {:state, config}} + end + + def execute({:state, config}) do + send(config[:test_pid], {:executed, self(), :erlang.monotonic_time()}) + :ok + end + + test "executes the handler on the interval" do + assert {:ok, _pid} = start_link({__MODULE__, 25, test_pid: self()}) + + assert_receive {:executed, _pid, time1}, 500 + assert_receive {:executed, _pid, time2}, 500 + + assert time1 < time2 + end + + test "delays initial message by the initial_delay" do + assert {:ok, _pid} = + start_link({ + __MODULE__, + 25, + test_pid: self(), initial_delay: 100 + }) + + refute_receive {:executed, _pid, _time}, 50 + assert_receive {:executed, _pid, _time}, 2000 + end + + describe "reject_locked/2" do + test "returns all rows if none are locked" do + account1 = Fixtures.Accounts.create_account() + account2 = Fixtures.Accounts.create_account() + rows = [account1, account2] + + Domain.Repo.checkout(fn -> + assert reject_locked("accounts", rows) == rows + end) + end + + test "does not allow two processes to lock the same rows" do + account1 = Fixtures.Accounts.create_account() + account2 = Fixtures.Accounts.create_account() + rows = [account1, account2] + + test_pid = self() + + task1 = + Task.async(fn -> + :ok = Ecto.Adapters.SQL.Sandbox.checkout(Domain.Repo) + + Domain.Repo.checkout(fn -> + rows = reject_locked("accounts", rows) + send(test_pid, {:locked, rows}) + Process.sleep(300) + end) + end) + + task2 = + Task.async(fn -> + :ok = Ecto.Adapters.SQL.Sandbox.checkout(Domain.Repo) + + Domain.Repo.checkout(fn -> + rows = reject_locked("accounts", rows) + send(test_pid, {:locked, rows}) + Process.sleep(300) + end) + end) + + assert_receive {:locked, rows1} + assert_receive {:locked, rows2} + assert length(rows1) + length(rows2) == length(rows) + + Task.ignore(task1) + Task.ignore(task2) + end + end +end diff --git a/elixir/apps/domain/test/domain/jobs/executors/global_test.exs b/elixir/apps/domain/test/domain/jobs/executors/globally_unique_test.exs similarity index 64% rename from elixir/apps/domain/test/domain/jobs/executors/global_test.exs rename to elixir/apps/domain/test/domain/jobs/executors/globally_unique_test.exs index 4d5e12f73..1cb873de6 100644 --- a/elixir/apps/domain/test/domain/jobs/executors/global_test.exs +++ b/elixir/apps/domain/test/domain/jobs/executors/globally_unique_test.exs @@ -1,14 +1,14 @@ -defmodule Domain.Jobs.Executors.GlobalTest do +defmodule Domain.Jobs.Executors.GloballyUniqueTest do use ExUnit.Case, async: true - import Domain.Jobs.Executors.Global + import Domain.Jobs.Executors.GloballyUnique - def send_test_message(config) do + def execute(config) do send(config[:test_pid], {:executed, self(), :erlang.monotonic_time()}) :ok end test "executes the handler on the interval" do - assert {:ok, _pid} = start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) + assert {:ok, _pid} = start_link({__MODULE__, 25, test_pid: self()}) assert_receive {:executed, _pid, time1}, 500 assert_receive {:executed, _pid, time2}, 500 @@ -19,25 +19,25 @@ defmodule Domain.Jobs.Executors.GlobalTest do test "delays initial message by the initial_delay" do assert {:ok, _pid} = start_link({ - {__MODULE__, :send_test_message}, + __MODULE__, 25, test_pid: self(), initial_delay: 100 }) refute_receive {:executed, _pid, _time}, 50 - assert_receive {:executed, _pid, _time}, 1000 + assert_receive {:executed, _pid, _time}, 2000 end test "registers itself as a leader if there is no global name registered" do - assert {:ok, pid} = start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) - assert_receive {:executed, ^pid, _time}, 1000 - name = {Domain.Jobs.Executors.Global, __MODULE__, :send_test_message} + assert {:ok, pid} = start_link({__MODULE__, 25, test_pid: self()}) + assert_receive {:executed, ^pid, _time}, 2000 + name = {Domain.Jobs.Executors.GloballyUnique, __MODULE__} assert :global.whereis_name(name) == pid assert :sys.get_state(pid) == { { - {__MODULE__, :send_test_message}, + __MODULE__, 25, [test_pid: self()] }, @@ -47,17 +47,17 @@ defmodule Domain.Jobs.Executors.GlobalTest do test "other processes register themselves as fallbacks and monitor the leader" do assert {:ok, leader_pid} = - start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) + start_link({__MODULE__, 25, test_pid: self()}) assert {:ok, fallback1_pid} = - start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) + start_link({__MODULE__, 25, test_pid: self()}) assert {:ok, fallback2_pid} = - start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) + start_link({__MODULE__, 25, test_pid: self()}) assert_receive {:executed, ^leader_pid, _time}, 500 - name = {Domain.Jobs.Executors.Global, __MODULE__, :send_test_message} + name = {Domain.Jobs.Executors.GloballyUnique, __MODULE__} assert :global.whereis_name(name) == leader_pid assert {_state, {:fallback, ^leader_pid, _monitor_ref}} = :sys.get_state(fallback1_pid) @@ -66,13 +66,13 @@ defmodule Domain.Jobs.Executors.GlobalTest do test "other processes register a new leader when old one is down" do assert {:ok, leader_pid} = - start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) + start_link({__MODULE__, 25, test_pid: self()}) assert {:ok, fallback1_pid} = - start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) + start_link({__MODULE__, 25, test_pid: self()}) assert {:ok, fallback2_pid} = - start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) + start_link({__MODULE__, 25, test_pid: self()}) Process.flag(:trap_exit, true) Process.exit(leader_pid, :kill) @@ -91,7 +91,7 @@ defmodule Domain.Jobs.Executors.GlobalTest do assert {_state, {:fallback, ^new_leader_pid, _monitor_ref}} = :sys.get_state(fallback_pid) assert {_state, :leader} = :sys.get_state(new_leader_pid) - name = {Domain.Jobs.Executors.Global, __MODULE__, :send_test_message} + name = {Domain.Jobs.Executors.GloballyUnique, __MODULE__} assert :global.whereis_name(name) == new_leader_pid assert_receive {:executed, ^new_leader_pid, _time}, 1000 diff --git a/elixir/apps/domain/test/domain/jobs/recurrent_test.exs b/elixir/apps/domain/test/domain/jobs/recurrent_test.exs deleted file mode 100644 index 0fcba2453..000000000 --- a/elixir/apps/domain/test/domain/jobs/recurrent_test.exs +++ /dev/null @@ -1,43 +0,0 @@ -defmodule Domain.Jobs.RecurrentTest do - use ExUnit.Case, async: true - import Domain.Jobs.Recurrent - - defmodule TestDefinition do - use Domain.Jobs.Recurrent, otp_app: :domain - require Logger - - every seconds(1), :second_test, config do - send(config[:test_pid], :executed) - end - - every minutes(5), :minute_test do - :ok - end - end - - describe "seconds/1" do - test "converts seconds to milliseconds" do - assert seconds(1) == 1_000 - assert seconds(13) == 13_000 - end - end - - describe "minutes/1" do - test "converts minutes to milliseconds" do - assert minutes(1) == 60_000 - assert minutes(13) == 780_000 - end - end - - test "defines callbacks" do - assert length(TestDefinition.__handlers__()) == 2 - - assert {:minute_test, 300_000} in TestDefinition.__handlers__() - assert {:second_test, 1000} in TestDefinition.__handlers__() - - assert TestDefinition.minute_test(test_pid: self()) == :ok - - assert TestDefinition.second_test(test_pid: self()) == :executed - assert_receive :executed - end -end diff --git a/elixir/apps/domain/test/support/fixture.ex b/elixir/apps/domain/test/support/fixture.ex index c17bc3765..ed908b309 100644 --- a/elixir/apps/domain/test/support/fixture.ex +++ b/elixir/apps/domain/test/support/fixture.ex @@ -40,7 +40,7 @@ defmodule Domain.Fixture do end def unique_integer do - System.unique_integer([:positive]) + System.unique_integer([:positive, :monotonic]) end def unique_ipv4 do diff --git a/terraform/environments/production/portal.tf b/terraform/environments/production/portal.tf index 6156c3362..11b7beaa7 100644 --- a/terraform/environments/production/portal.tf +++ b/terraform/environments/production/portal.tf @@ -270,7 +270,7 @@ locals { cluster_version_label = "cluster_version" cluster_version = split(".", local.portal_image_tag)[0] node_name_label = "application" - polling_interval_ms = 7000 + polling_interval_ms = 10000 }) }, { diff --git a/terraform/environments/staging/portal.tf b/terraform/environments/staging/portal.tf index 82f8a81e0..ea1595d71 100644 --- a/terraform/environments/staging/portal.tf +++ b/terraform/environments/staging/portal.tf @@ -401,6 +401,12 @@ module "domain" { name = "BACKGROUND_JOBS_ENABLED" value = "true" }, + # Pool size is increased because background jobs are holding + # the connections for a long time + { + name = "DATABASE_POOL_SIZE" + value = "15" + }, ], local.shared_application_environment_variables) application_labels = {