refactor(portal): Allow concurrent updates to synced actor/actor identities during sync (#4409)

This commit is contained in:
Andrew Dryga
2024-04-18 16:08:06 -06:00
committed by GitHub
parent f024570c6c
commit 79f199830e
43 changed files with 1280 additions and 961 deletions

View File

@@ -121,12 +121,22 @@ defmodule Domain.Actors do
end
end
def sync_provider_groups_multi(%Auth.Provider{} = provider, attrs_list) do
Group.Sync.sync_provider_groups_multi(provider, attrs_list)
def sync_provider_groups(%Auth.Provider{} = provider, attrs_list) do
Group.Sync.sync_provider_groups(provider, attrs_list)
end
def sync_provider_memberships_multi(multi, %Auth.Provider{} = provider, tuples) do
Membership.Sync.sync_provider_memberships_multi(multi, provider, tuples)
def sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
%Auth.Provider{} = provider,
tuples
) do
Membership.Sync.sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
provider,
tuples
)
end
def new_group(attrs \\ %{}) do

View File

@@ -1,9 +1,10 @@
defmodule Domain.Actors.Group.Sync do
alias Domain.Repo
alias Domain.Auth
alias Domain.Actors
alias Domain.Actors.Group
def sync_provider_groups_multi(%Auth.Provider{} = provider, attrs_list) do
def sync_provider_groups(%Auth.Provider{} = provider, attrs_list) do
attrs_by_provider_identifier =
for attrs <- attrs_list, into: %{} do
{Map.fetch!(attrs, "provider_identifier"), attrs}
@@ -11,46 +12,36 @@ defmodule Domain.Actors.Group.Sync do
provider_identifiers = Map.keys(attrs_by_provider_identifier)
Ecto.Multi.new()
|> Ecto.Multi.all(:groups, fn _effects_so_far ->
fetch_provider_groups_query(provider)
end)
|> Ecto.Multi.run(:plan_groups, fn _repo, %{groups: groups} ->
plan_groups_update(groups, provider_identifiers)
end)
|> Ecto.Multi.run(
:delete_groups,
fn repo, %{plan_groups: {_upsert, delete}} ->
delete_groups(repo, provider, delete)
end
)
|> Ecto.Multi.run(:upsert_groups, fn repo, %{plan_groups: {upsert, _delete}} ->
upsert_groups(repo, provider, attrs_by_provider_identifier, upsert)
end)
|> Ecto.Multi.run(
:group_ids_by_provider_identifier,
fn _repo,
%{
plan_groups: {_upsert, delete},
groups: groups,
upsert_groups: upsert_groups
} ->
group_ids_by_provider_identifier =
for group <- groups ++ upsert_groups,
group.provider_identifier not in delete,
into: %{} do
{group.provider_identifier, group.id}
end
with {:ok, groups} <- all_provider_groups(provider),
{:ok, {upsert, delete}} <- plan_groups_update(groups, provider_identifiers),
{:ok, deleted} <- delete_groups(provider, delete),
{:ok, upserted} <- upsert_groups(provider, attrs_by_provider_identifier, upsert) do
group_ids_by_provider_identifier =
for group <- groups ++ upserted,
group.provider_identifier not in delete,
into: %{} do
{group.provider_identifier, group.id}
end
{:ok, group_ids_by_provider_identifier}
end
)
{:ok,
%{
groups: groups,
plan: {upsert, delete},
deleted: deleted,
upserted: upserted,
group_ids_by_provider_identifier: group_ids_by_provider_identifier
}}
end
end
defp fetch_provider_groups_query(provider) do
Group.Query.not_deleted()
|> Group.Query.by_account_id(provider.account_id)
|> Group.Query.by_provider_id(provider.id)
defp all_provider_groups(provider) do
groups =
Group.Query.not_deleted()
|> Group.Query.by_account_id(provider.account_id)
|> Group.Query.by_provider_id(provider.id)
|> Repo.all()
{:ok, groups}
end
defp plan_groups_update(groups, provider_identifiers) do
@@ -66,7 +57,7 @@ defmodule Domain.Actors.Group.Sync do
{:ok, {upsert, delete}}
end
defp delete_groups(_repo, provider, provider_identifiers_to_delete) do
defp delete_groups(provider, provider_identifiers_to_delete) do
Group.Query.not_deleted()
|> Group.Query.by_account_id(provider.account_id)
|> Group.Query.by_provider_id(provider.id)
@@ -74,13 +65,13 @@ defmodule Domain.Actors.Group.Sync do
|> Actors.delete_groups()
end
defp upsert_groups(repo, provider, attrs_by_provider_identifier, provider_identifiers_to_upsert) do
defp upsert_groups(provider, attrs_by_provider_identifier, provider_identifiers_to_upsert) do
provider_identifiers_to_upsert
|> Enum.reduce_while({:ok, []}, fn provider_identifier, {:ok, acc} ->
attrs = Map.get(attrs_by_provider_identifier, provider_identifier)
attrs = Map.put(attrs, "type", :managed)
case upsert_group(repo, provider, attrs) do
case upsert_group(Repo, provider, attrs) do
{:ok, group} ->
{:cont, {:ok, [group | acc]}}

View File

@@ -1,61 +1,57 @@
defmodule Domain.Actors.Membership.Sync do
alias Domain.Repo
alias Domain.Auth
alias Domain.Actors
alias Domain.Actors.Membership
def sync_provider_memberships_multi(multi, %Auth.Provider{} = provider, tuples) do
multi
|> Ecto.Multi.all(:memberships, fn _effects_so_far ->
fetch_provider_memberships_query(provider)
end)
|> Ecto.Multi.run(
:plan_memberships,
fn _repo,
%{
actor_ids_by_provider_identifier: actor_ids_by_provider_identifier,
group_ids_by_provider_identifier: group_ids_by_provider_identifier,
memberships: memberships
} ->
tuples =
Enum.flat_map(tuples, fn {group_provider_identifier, actor_provider_identifier} ->
group_id = Map.get(group_ids_by_provider_identifier, group_provider_identifier)
actor_id = Map.get(actor_ids_by_provider_identifier, actor_provider_identifier)
def sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
%Auth.Provider{} = provider,
tuples
) do
tuples =
Enum.flat_map(tuples, fn {group_provider_identifier, actor_provider_identifier} ->
group_id = Map.get(group_ids_by_provider_identifier, group_provider_identifier)
actor_id = Map.get(actor_ids_by_provider_identifier, actor_provider_identifier)
if is_nil(group_id) or is_nil(actor_id) do
[]
else
[{group_id, actor_id}]
end
end)
if is_nil(group_id) or is_nil(actor_id) do
[]
else
[{group_id, actor_id}]
end
end)
plan_memberships_update(tuples, memberships)
end
)
|> Ecto.Multi.delete_all(:delete_memberships, fn %{plan_memberships: {_insert, delete}} ->
delete_memberships_query(delete)
end)
|> Ecto.Multi.run(:insert_memberships, fn repo, %{plan_memberships: {insert, _delete}} ->
insert_memberships(repo, provider, insert)
end)
|> Ecto.Multi.run(:broadcast_membership_updates, fn
_repo, %{plan_memberships: {insert, delete}} ->
:ok =
Enum.each(insert, fn {group_id, actor_id} ->
Actors.broadcast_membership_event(:create, actor_id, group_id)
end)
with {:ok, memberships} <- all_provider_memberships(provider),
{:ok, {insert, delete}} <- plan_memberships_update(tuples, memberships),
deleted_stats = delete_memberships(delete),
{:ok, inserted} <- insert_memberships(provider, insert) do
:ok =
Enum.each(insert, fn {group_id, actor_id} ->
Actors.broadcast_membership_event(:create, actor_id, group_id)
end)
:ok =
Enum.each(delete, fn {group_id, actor_id} ->
Actors.broadcast_membership_event(:delete, actor_id, group_id)
end)
:ok =
Enum.each(delete, fn {group_id, actor_id} ->
Actors.broadcast_membership_event(:delete, actor_id, group_id)
end)
{:ok, :ok}
end)
{:ok,
%{
plan: {insert, delete},
inserted: inserted,
deleted_stats: deleted_stats
}}
end
end
defp fetch_provider_memberships_query(provider) do
Membership.Query.by_account_id(provider.account_id)
|> Membership.Query.by_group_provider_id(provider.id)
defp all_provider_memberships(provider) do
memberships =
Membership.Query.by_account_id(provider.account_id)
|> Membership.Query.by_group_provider_id(provider.id)
|> Repo.all()
{:ok, memberships}
end
defp plan_memberships_update(tuples, memberships) do
@@ -77,16 +73,17 @@ defmodule Domain.Actors.Membership.Sync do
{:ok, {insert, delete}}
end
defp delete_memberships_query(provider_identifiers_to_delete) do
defp delete_memberships(provider_identifiers_to_delete) do
Membership.Query.by_group_id_and_actor_id({:in, provider_identifiers_to_delete})
|> Repo.delete_all()
end
defp insert_memberships(repo, provider, provider_identifiers_to_insert) do
defp insert_memberships(provider, provider_identifiers_to_insert) do
provider_identifiers_to_insert
|> Enum.reduce_while({:ok, []}, fn {group_id, actor_id}, {:ok, acc} ->
attrs = %{group_id: group_id, actor_id: actor_id}
case upsert_membership(repo, provider, attrs) do
case upsert_membership(provider, attrs) do
{:ok, membership} ->
{:cont, {:ok, [membership | acc]}}
@@ -96,9 +93,9 @@ defmodule Domain.Actors.Membership.Sync do
end)
end
defp upsert_membership(repo, provider, attrs) do
defp upsert_membership(provider, attrs) do
Membership.Changeset.upsert(provider.account_id, %Membership{}, attrs)
|> repo.insert(
|> Repo.insert(
conflict_target: Membership.Changeset.upsert_conflict_target(),
on_conflict: Membership.Changeset.upsert_on_conflict(),
returning: true

View File

@@ -360,8 +360,8 @@ defmodule Domain.Auth do
end
end
def sync_provider_identities_multi(%Provider{} = provider, attrs_list) do
Identity.Sync.sync_provider_identities_multi(provider, attrs_list)
def sync_provider_identities(%Provider{} = provider, attrs_list) do
Identity.Sync.sync_provider_identities(provider, attrs_list)
end
def all_actor_ids_by_membership_rules!(account_id, membership_rules) do
@@ -789,6 +789,7 @@ defmodule Domain.Auth do
{:error, :not_found}
end
# Maybe we need a NOWAIT here to prevent timeouts when background jobs are updating the identity
defp maybe_fetch_subject_identity(subject, token) do
Identity.Query.not_disabled()
|> Identity.Query.by_id(token.identity_id)

View File

@@ -1,9 +1,20 @@
defmodule Domain.Auth.Adapter.DirectorySync do
alias Domain.Repo
alias Domain.Jobs.Executors.Concurrent
alias Domain.{Auth, Actors}
require Logger
require OpenTelemetry.Tracer
@async_data_fetch_timeout :infinity
# The Finch will timeout requests after 30 seconds,
# but there are a lot of requests that need to be made
# so we don't want to limit the timeout here
@async_data_fetch_timeout :timer.minutes(30)
# This timeout is used to limit the time spent on a single provider
# inserting the records into the database
@database_operations_timeout :timer.minutes(30)
@provider_sync_timeout @async_data_fetch_timeout + @database_operations_timeout
@doc """
Returns a tuple with the data needed to sync all entities of the provider.
@@ -51,7 +62,7 @@ defmodule Domain.Auth.Adapter.DirectorySync do
Where `user_message` user message will be rendered in the UI and `log_message` will be logged with
a level that corresponds to number of retries (see `log_sync_error/2`).
"""
@callback gather_provider_data(%Auth.Provider{}) ::
@callback gather_provider_data(%Auth.Provider{}, task_supervisor_pid :: pid()) ::
{:ok,
{
identities_attrs :: [
@@ -70,95 +81,206 @@ defmodule Domain.Auth.Adapter.DirectorySync do
| {:error, {:unauthorized, user_message :: String.t()}}
| {:error, user_message :: String.t(), log_message :: String.t()}
def sync_providers(module, providers) do
providers
|> Domain.Repo.preload(:account)
|> Enum.each(&sync_provider(module, &1))
def sync_providers(module, adapter, supervisor_pid) do
start_time = System.monotonic_time(:millisecond)
Domain.Repo.transaction(
fn ->
metadata = Logger.metadata()
pdict = Process.get()
all_providers = Domain.Auth.all_providers_pending_sync_by_adapter!(adapter)
providers = Concurrent.reject_locked("auth_providers", all_providers)
providers = Domain.Repo.preload(providers, :account)
Logger.info("Syncing #{length(providers)}/#{length(all_providers)} #{adapter} providers")
Task.Supervisor.async_stream_nolink(
supervisor_pid,
providers,
fn provider ->
OpenTelemetry.Tracer.with_span "sync_provider",
attributes: %{
account_id: provider.account_id,
provider_id: provider.id,
provider_adapter: provider.adapter
} do
:ok = maybe_reuse_connection(pdict)
Logger.metadata(metadata)
Logger.metadata(
account_id: provider.account_id,
provider_id: provider.id,
provider_adapter: provider.adapter
)
sync_provider(module, provider)
end
end,
timeout: @provider_sync_timeout,
max_concurrency: 3
)
|> Stream.run()
end,
# sync can take a long time so we will manage timeouts for each provider separately
timeout: :infinity
)
finish_time = System.monotonic_time(:millisecond)
Logger.info("Finished syncing in #{time_taken(start_time, finish_time)}")
end
defp sync_provider(module, provider) do
Logger.debug("Syncing provider",
account_id: provider.account_id,
provider_id: provider.id,
provider_adapter: provider.adapter
)
start_time = System.monotonic_time(:millisecond)
Logger.debug("Syncing provider")
with true <- Domain.Accounts.idp_sync_enabled?(provider.account),
{:ok, {identities_attrs, actor_groups_attrs, membership_tuples}} <-
module.gather_provider_data(provider) do
Ecto.Multi.new()
|> Ecto.Multi.one(:lock_provider, fn _effects_so_far ->
Auth.Provider.Query.not_disabled()
|> Auth.Provider.Query.by_account_id(provider.account_id)
|> Auth.Provider.Query.by_id(provider.id)
|> Auth.Provider.Query.lock()
end)
|> Ecto.Multi.append(Auth.sync_provider_identities_multi(provider, identities_attrs))
|> Ecto.Multi.append(Actors.sync_provider_groups_multi(provider, actor_groups_attrs))
|> Actors.sync_provider_memberships_multi(provider, membership_tuples)
|> Ecto.Multi.update(:save_last_updated_at, fn _effects_so_far ->
Auth.Provider.Changeset.sync_finished(provider)
end)
|> Repo.transaction(timeout: :timer.minutes(30))
|> case do
{:ok, effects} ->
log_sync_result(provider, effects)
:ok
{:ok, pid} = Task.Supervisor.start_link()
{:error, reason} ->
log_sync_error(
provider,
"Repo error: " <> inspect(reason)
)
if Domain.Accounts.idp_sync_enabled?(provider.account) do
with {:ok, data, data_fetch_time_taken} <- fetch_provider_data(module, provider, pid),
{:ok, db_operations_time_taken} <- insert_provider_updates(provider, data) do
finish_time = System.monotonic_time(:millisecond)
{:error, step, reason, _effects_so_far} ->
log_sync_error(
provider,
"Multi error at step " <> inspect(step) <> ": " <> inspect(reason)
)
:telemetry.execute(
[:domain, :directory_sync],
%{
data_fetch_total_time: data_fetch_time_taken,
db_operations_total_time: db_operations_time_taken,
total_time: finish_time - start_time
},
%{
account_id: provider.account_id,
provider_id: provider.id,
provider_adapter: provider.adapter
}
)
Logger.debug("Finished syncing provider in #{time_taken(start_time, finish_time)}")
else
_other ->
finish_time = System.monotonic_time(:millisecond)
Logger.debug("Failed to sync provider in #{time_taken(start_time, finish_time)}")
end
else
false ->
message = "IdP sync is not enabled in your subscription plan"
message = "IdP sync is not enabled in your subscription plan"
Auth.Provider.Changeset.sync_failed(provider, message)
|> Domain.Repo.update!()
{:error, {:unauthorized, user_message}} ->
Auth.Provider.Changeset.sync_requires_manual_intervention(provider, user_message)
|> Domain.Repo.update!()
{:error, nil, log_message} ->
log_sync_error(provider, log_message)
{:error, user_message, log_message} ->
Auth.Provider.Changeset.sync_failed(provider, user_message)
|> Domain.Repo.update!()
|> log_sync_error(log_message)
Auth.Provider.Changeset.sync_failed(provider, message)
|> Domain.Repo.update!()
end
end
defp log_sync_result(provider, effects) do
%{
# Identities
plan_identities: {identities_insert_ids, identities_update_ids, identities_delete_ids},
insert_identities: identities_inserted,
update_identities_and_actors: identities_updated,
delete_identities: identities_deleted,
# Groups
plan_groups: {groups_upsert_ids, groups_delete_ids},
upsert_groups: groups_upserted,
delete_groups: groups_deleted,
# Memberships
plan_memberships: {memberships_insert_tuples, memberships_delete_tuples},
insert_memberships: memberships_inserted,
delete_memberships: {deleted_memberships_count, _}
} = effects
defp fetch_provider_data(module, provider, task_supervisor_pid) do
OpenTelemetry.Tracer.with_span "sync_provider.fetch_data" do
start_time = System.monotonic_time(:millisecond)
Logger.debug("Finished syncing provider",
provider_id: provider.id,
provider_adapter: provider.adapter,
account_id: provider.account_id,
with {:ok, data} <- module.gather_provider_data(provider, task_supervisor_pid) do
finish_time = System.monotonic_time(:millisecond)
time_taken = time_taken(start_time, finish_time)
Logger.debug(
"Finished fetching data for provider in #{time_taken}",
account_id: provider.account_id,
provider_id: provider.id,
provider_adapter: provider.adapter,
time_taken: time_taken
)
{:ok, data, finish_time - start_time}
else
{:error, {:unauthorized, user_message}} ->
OpenTelemetry.Tracer.set_status(:error, inspect(user_message))
Auth.Provider.Changeset.sync_requires_manual_intervention(provider, user_message)
|> Domain.Repo.update!()
{:error, nil, log_message} ->
OpenTelemetry.Tracer.set_status(:error, inspect(log_message))
log_sync_error(provider, log_message)
{:error, user_message, log_message} ->
OpenTelemetry.Tracer.set_status(:error, inspect(log_message))
Auth.Provider.Changeset.sync_failed(provider, user_message)
|> Domain.Repo.update!()
|> log_sync_error(log_message)
end
end
end
defp insert_provider_updates(
provider,
{identities_attrs, actor_groups_attrs, membership_tuples}
) do
OpenTelemetry.Tracer.with_span "sync_provider.insert_data" do
Repo.checkout(
fn ->
start_time = System.monotonic_time(:millisecond)
with {:ok, identities_effects} <-
Auth.sync_provider_identities(provider, identities_attrs),
{:ok, groups_effects} <- Actors.sync_provider_groups(provider, actor_groups_attrs),
{:ok, memberships_effects} <-
Actors.sync_provider_memberships(
identities_effects.actor_ids_by_provider_identifier,
groups_effects.group_ids_by_provider_identifier,
provider,
membership_tuples
) do
Auth.Provider.Changeset.sync_finished(provider)
|> Repo.update!()
finish_time = System.monotonic_time(:millisecond)
log_sync_result(
start_time,
finish_time,
identities_effects,
groups_effects,
memberships_effects
)
{:ok, finish_time - start_time}
else
{:error, reason} ->
OpenTelemetry.Tracer.set_status(:error, inspect(reason))
log_sync_error(provider, "Repo error: " <> inspect(reason))
end
end,
timeout: @database_operations_timeout
)
end
end
defp log_sync_result(
start_time,
finish_time,
identities_effects,
groups_effects,
memberships_effects
) do
%{
plan: {identities_insert_ids, identities_update_ids, identities_delete_ids},
inserted: identities_inserted,
updated: identities_updated,
deleted: identities_deleted
} = identities_effects
%{
plan: {groups_upsert_ids, groups_delete_ids},
upserted: groups_upserted,
deleted: groups_deleted
} = groups_effects
%{
plan: {memberships_insert_tuples, memberships_delete_tuples},
inserted: memberships_inserted,
deleted_stats: {deleted_memberships_count, _}
} = memberships_effects
time_taken = time_taken(start_time, finish_time)
Logger.debug("Finished syncing provider in #{time_taken}",
time_taken: time_taken,
# Identities
plan_identities_insert: length(identities_insert_ids),
plan_identities_update: length(identities_update_ids),
@@ -179,6 +301,12 @@ defmodule Domain.Auth.Adapter.DirectorySync do
)
end
defp time_taken(start_time, finish_time) do
~T[00:00:00]
|> Time.add(finish_time - start_time, :millisecond)
|> to_string()
end
defp log_sync_error(provider, message) do
metadata = [
account_id: provider.account_id,
@@ -213,7 +341,10 @@ defmodule Domain.Auth.Adapter.DirectorySync do
Process.put(:last_caller_pid, caller_pid)
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
callback.()
OpenTelemetry.Tracer.with_span "sync_provider.fetch_data.#{name}" do
callback.()
end
end)
{name, task}
@@ -231,4 +362,26 @@ defmodule Domain.Auth.Adapter.DirectorySync do
end
end)
end
if Mix.env() == :test do
# We need this function to reuse the connection that was checked out in a parent process.
#
# `Ecto.SQL.Sandbox.allow/3` will not work in this case because it will try to checkout the same connection
# that is held by the parent process by `Repo.checkout/2` which will lead to a timeout, so we need to hack
# and reuse it manually.
def maybe_reuse_connection(pdict) do
pdict
|> Enum.filter(fn
{{Ecto.Adapters.SQL, _pid}, _} -> true
_ -> false
end)
|> Enum.each(fn {key, value} ->
Process.put(key, value)
end)
end
else
def maybe_reuse_connection(_pdict) do
:ok
end
end
end

View File

@@ -17,8 +17,9 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace do
def init(_init_arg) do
children = [
GoogleWorkspace.APIClient,
{Task.Supervisor, name: __MODULE__.TaskSupervisor},
{Domain.Jobs, GoogleWorkspace.Jobs}
# Background Jobs
GoogleWorkspace.Jobs.RefreshAccessTokens,
GoogleWorkspace.Jobs.SyncDirectory
]
Supervisor.init(children, strategy: :one_for_one)

View File

@@ -0,0 +1,37 @@
defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.RefreshAccessTokens do
use Domain.Jobs.Job,
otp_app: :domain,
every: :timer.minutes(5),
executor: Domain.Jobs.Executors.GloballyUnique
alias Domain.Auth.Adapters.GoogleWorkspace
require Logger
@impl true
def execute(_config) do
providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:google_workspace)
Logger.debug("Refreshing access tokens for #{length(providers)} providers")
Enum.each(providers, fn provider ->
Logger.debug("Refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
case GoogleWorkspace.refresh_access_token(provider) do
{:ok, provider} ->
Logger.debug("Finished refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
{:error, reason} ->
Logger.error("Failed refreshing access token",
provider_id: provider.id,
account_id: provider.account_id,
reason: inspect(reason)
)
end
end)
end
end

View File

@@ -1,49 +1,31 @@
defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs do
use Domain.Jobs.Recurrent, otp_app: :domain
defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectory do
use Domain.Jobs.Job,
otp_app: :domain,
every: :timer.minutes(2),
executor: Domain.Jobs.Executors.Concurrent
alias Domain.Auth.Adapter.DirectorySync
alias Domain.Auth.Adapters.GoogleWorkspace
require Logger
@behaviour DirectorySync
@task_supervisor __MODULE__.TaskSupervisor
every minutes(5), :refresh_access_tokens do
providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:google_workspace)
Logger.debug("Refreshing access tokens for #{length(providers)} providers")
Enum.each(providers, fn provider ->
Logger.debug("Refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
case GoogleWorkspace.refresh_access_token(provider) do
{:ok, provider} ->
Logger.debug("Finished refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
{:error, reason} ->
Logger.error("Failed refreshing access token",
provider_id: provider.id,
account_id: provider.account_id,
reason: inspect(reason)
)
end
end)
@impl true
def state(_config) do
{:ok, pid} = Task.Supervisor.start_link(name: @task_supervisor)
{:ok, %{task_supervisor: pid}}
end
every minutes(3), :sync_directory do
providers = Domain.Auth.all_providers_pending_sync_by_adapter!(:google_workspace)
Logger.debug("Syncing #{length(providers)} Google Workspace providers")
DirectorySync.sync_providers(__MODULE__, providers)
@impl true
def execute(%{task_supervisor: pid}) do
DirectorySync.sync_providers(__MODULE__, :google_workspace, pid)
end
def gather_provider_data(provider) do
def gather_provider_data(provider, task_supervisor_pid) do
access_token = provider.adapter_state["access_token"]
async_results =
DirectorySync.run_async_requests(GoogleWorkspace.TaskSupervisor,
DirectorySync.run_async_requests(task_supervisor_pid,
users: fn ->
GoogleWorkspace.APIClient.list_users(access_token)
end,

View File

@@ -17,8 +17,9 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra do
def init(_init_arg) do
children = [
MicrosoftEntra.APIClient,
{Task.Supervisor, name: __MODULE__.TaskSupervisor},
{Domain.Jobs, MicrosoftEntra.Jobs}
# Background Jobs
MicrosoftEntra.Jobs.RefreshAccessTokens,
MicrosoftEntra.Jobs.SyncDirectory
]
Supervisor.init(children, strategy: :one_for_one)

View File

@@ -0,0 +1,37 @@
defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.RefreshAccessTokens do
use Domain.Jobs.Job,
otp_app: :domain,
every: :timer.minutes(5),
executor: Domain.Jobs.Executors.GloballyUnique
alias Domain.Auth.Adapters.MicrosoftEntra
require Logger
@impl true
def execute(_config) do
providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:microsoft_entra)
Logger.debug("Refreshing access tokens for #{length(providers)} Microsoft Entra providers")
Enum.each(providers, fn provider ->
Logger.debug("Refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
case MicrosoftEntra.refresh_access_token(provider) do
{:ok, provider} ->
Logger.debug("Finished refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
{:error, reason} ->
Logger.error("Failed refreshing access token",
provider_id: provider.id,
account_id: provider.account_id,
reason: inspect(reason)
)
end
end)
end
end

View File

@@ -1,49 +1,31 @@
defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs do
use Domain.Jobs.Recurrent, otp_app: :domain
defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectory do
use Domain.Jobs.Job,
otp_app: :domain,
every: :timer.minutes(5),
executor: Domain.Jobs.Executors.Concurrent
alias Domain.Auth.Adapter.DirectorySync
alias Domain.Auth.Adapters.MicrosoftEntra
require Logger
@behaviour DirectorySync
@task_supervisor __MODULE__.TaskSupervisor
every minutes(5), :refresh_access_tokens do
providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:microsoft_entra)
Logger.debug("Refreshing access tokens for #{length(providers)} Microsoft Entra providers")
Enum.each(providers, fn provider ->
Logger.debug("Refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
case MicrosoftEntra.refresh_access_token(provider) do
{:ok, provider} ->
Logger.debug("Finished refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
{:error, reason} ->
Logger.error("Failed refreshing access token",
provider_id: provider.id,
account_id: provider.account_id,
reason: inspect(reason)
)
end
end)
@impl true
def state(_config) do
{:ok, pid} = Task.Supervisor.start_link(name: @task_supervisor)
{:ok, %{task_supervisor: pid}}
end
every minutes(3), :sync_directory do
providers = Domain.Auth.all_providers_pending_sync_by_adapter!(:microsoft_entra)
Logger.debug("Syncing #{length(providers)} Microsoft Entra providers")
DirectorySync.sync_providers(__MODULE__, providers)
@impl true
def execute(%{task_supervisor: pid}) do
DirectorySync.sync_providers(__MODULE__, :microsoft_entra, pid)
end
def gather_provider_data(provider) do
def gather_provider_data(provider, task_supervisor_pid) do
access_token = provider.adapter_state["access_token"]
async_results =
DirectorySync.run_async_requests(MicrosoftEntra.TaskSupervisor,
DirectorySync.run_async_requests(task_supervisor_pid,
users: fn ->
MicrosoftEntra.APIClient.list_users(access_token)
end,

View File

@@ -17,8 +17,9 @@ defmodule Domain.Auth.Adapters.Okta do
def init(_init_arg) do
children = [
Okta.APIClient,
{Task.Supervisor, name: __MODULE__.TaskSupervisor},
{Domain.Jobs, Okta.Jobs}
# Background Jobs
Okta.Jobs.RefreshAccessTokens,
Okta.Jobs.SyncDirectory
]
Supervisor.init(children, strategy: :one_for_one)

View File

@@ -0,0 +1,37 @@
defmodule Domain.Auth.Adapters.Okta.Jobs.RefreshAccessTokens do
use Domain.Jobs.Job,
otp_app: :domain,
every: :timer.minutes(5),
executor: Domain.Jobs.Executors.GloballyUnique
alias Domain.Auth.Adapters.Okta
require Logger
@impl true
def execute(_config) do
providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:okta)
Logger.debug("Refreshing access tokens for #{length(providers)} Okta providers")
Enum.each(providers, fn provider ->
Logger.debug("Refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
case Okta.refresh_access_token(provider) do
{:ok, provider} ->
Logger.debug("Finished refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
{:error, reason} ->
Logger.error("Failed refreshing access token",
provider_id: provider.id,
account_id: provider.account_id,
reason: inspect(reason)
)
end
end)
end
end

View File

@@ -1,50 +1,32 @@
defmodule Domain.Auth.Adapters.Okta.Jobs do
use Domain.Jobs.Recurrent, otp_app: :domain
defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectory do
use Domain.Jobs.Job,
otp_app: :domain,
every: :timer.minutes(5),
executor: Domain.Jobs.Executors.Concurrent
alias Domain.Auth.Adapter.DirectorySync
alias Domain.Auth.Adapters.Okta
require Logger
@behaviour DirectorySync
@task_supervisor __MODULE__.TaskSupervisor
every minutes(5), :refresh_access_tokens do
providers = Domain.Auth.all_providers_pending_token_refresh_by_adapter!(:okta)
Logger.debug("Refreshing access tokens for #{length(providers)} Okta providers")
Enum.each(providers, fn provider ->
Logger.debug("Refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
case Okta.refresh_access_token(provider) do
{:ok, provider} ->
Logger.debug("Finished refreshing access token",
provider_id: provider.id,
account_id: provider.account_id
)
{:error, reason} ->
Logger.error("Failed refreshing access token",
provider_id: provider.id,
account_id: provider.account_id,
reason: inspect(reason)
)
end
end)
@impl true
def state(_config) do
{:ok, pid} = Task.Supervisor.start_link(name: @task_supervisor)
{:ok, %{task_supervisor: pid}}
end
every minutes(3), :sync_directory do
providers = Domain.Auth.all_providers_pending_sync_by_adapter!(:okta)
Logger.debug("Syncing #{length(providers)} Okta providers")
DirectorySync.sync_providers(__MODULE__, providers)
@impl true
def execute(%{task_supervisor: pid}) do
DirectorySync.sync_providers(__MODULE__, :okta, pid)
end
def gather_provider_data(provider) do
def gather_provider_data(provider, task_supervisor_pid) do
endpoint = provider.adapter_config["api_base_url"]
access_token = provider.adapter_state["access_token"]
async_results =
DirectorySync.run_async_requests(Okta.TaskSupervisor,
DirectorySync.run_async_requests(task_supervisor_pid,
users: fn ->
Okta.APIClient.list_users(endpoint, access_token)
end,

View File

@@ -1,7 +1,8 @@
defmodule Domain.Auth.Identity.Sync do
alias Domain.Repo
alias Domain.Auth.{Identity, Provider}
def sync_provider_identities_multi(%Provider{} = provider, attrs_list) do
def sync_provider_identities(%Provider{} = provider, attrs_list) do
attrs_by_provider_identifier =
for attrs <- attrs_list, into: %{} do
{Map.fetch!(attrs, "provider_identifier"), attrs}
@@ -9,56 +10,42 @@ defmodule Domain.Auth.Identity.Sync do
provider_identifiers = Map.keys(attrs_by_provider_identifier)
Ecto.Multi.new()
|> Ecto.Multi.all(:identities, fn _effects_so_far ->
fetch_provider_identities_query(provider)
end)
|> Ecto.Multi.run(:plan_identities, fn _repo, %{identities: identities} ->
plan_identities_update(identities, provider_identifiers)
end)
|> Ecto.Multi.run(
:delete_identities,
fn repo, %{plan_identities: {_insert, _update, delete}} ->
delete_identities(repo, provider, delete)
end
)
|> Ecto.Multi.run(
:insert_identities,
fn repo, %{plan_identities: {insert, _update, _delete}} ->
insert_identities(repo, provider, attrs_by_provider_identifier, insert)
end
)
|> Ecto.Multi.run(
:update_identities_and_actors,
fn repo, %{identities: identities, plan_identities: {_insert, update, _delete}} ->
update_identities_and_actors(repo, identities, attrs_by_provider_identifier, update)
end
)
|> Ecto.Multi.run(
:actor_ids_by_provider_identifier,
fn _repo,
%{
update_identities_and_actors: update_identities,
insert_identities: insert_identities
} ->
actor_ids_by_provider_identifier =
for identity <- update_identities ++ insert_identities,
into: %{} do
{identity.provider_identifier, identity.actor_id}
end
{:ok, actor_ids_by_provider_identifier}
end
)
|> Ecto.Multi.run(:recalculate_dynamic_groups, fn _repo, _effects_so_far ->
with {:ok, identities} <- all_provider_identities(provider),
{:ok, {insert, update, delete}} <-
plan_identities_update(identities, provider_identifiers),
{:ok, deleted} <- delete_identities(provider, delete),
{:ok, inserted} <-
insert_identities(provider, attrs_by_provider_identifier, insert),
{:ok, updated} <-
update_identities_and_actors(identities, attrs_by_provider_identifier, update) do
Domain.Actors.update_dynamic_group_memberships(provider.account_id)
end)
actor_ids_by_provider_identifier =
for identity <- updated ++ inserted,
into: %{} do
{identity.provider_identifier, identity.actor_id}
end
{:ok,
%{
identities: identities,
plan: {insert, update, delete},
delete: deleted,
insert: inserted,
update: updated,
actor_ids_by_provider_identifier: actor_ids_by_provider_identifier
}}
end
end
defp fetch_provider_identities_query(provider) do
Identity.Query.all()
|> Identity.Query.by_account_id(provider.account_id)
|> Identity.Query.by_provider_id(provider.id)
defp all_provider_identities(provider) do
identities =
Identity.Query.all()
|> Identity.Query.by_account_id(provider.account_id)
|> Identity.Query.by_provider_id(provider.id)
|> Repo.all()
{:ok, identities}
end
defp plan_identities_update(identities, provider_identifiers) do
@@ -85,7 +72,7 @@ defmodule Domain.Auth.Identity.Sync do
{:ok, {insert, update, delete}}
end
defp delete_identities(repo, provider, provider_identifiers_to_delete) do
defp delete_identities(provider, provider_identifiers_to_delete) do
provider_identifiers_to_delete = Enum.uniq(provider_identifiers_to_delete)
{_count, identities} =
@@ -94,7 +81,7 @@ defmodule Domain.Auth.Identity.Sync do
|> Identity.Query.by_provider_id(provider.id)
|> Identity.Query.by_provider_identifier({:in, provider_identifiers_to_delete})
|> Identity.Query.delete()
|> repo.update_all([])
|> Repo.update_all([])
:ok =
Enum.each(identities, fn identity ->
@@ -104,19 +91,14 @@ defmodule Domain.Auth.Identity.Sync do
{:ok, identities}
end
defp insert_identities(
repo,
provider,
attrs_by_provider_identifier,
provider_identifiers_to_insert
) do
defp insert_identities(provider, attrs_by_provider_identifier, provider_identifiers_to_insert) do
provider_identifiers_to_insert
|> Enum.uniq()
|> Enum.reduce_while({:ok, []}, fn provider_identifier, {:ok, acc} ->
attrs = Map.get(attrs_by_provider_identifier, provider_identifier)
changeset = Identity.Changeset.create_identity_and_actor(provider, attrs)
case repo.insert(changeset) do
case Repo.insert(changeset) do
{:ok, identity} ->
{:cont, {:ok, [identity | acc]}}
@@ -127,7 +109,6 @@ defmodule Domain.Auth.Identity.Sync do
end
defp update_identities_and_actors(
repo,
identities,
attrs_by_provider_identifier,
provider_identifiers_to_update
@@ -137,7 +118,7 @@ defmodule Domain.Auth.Identity.Sync do
|> Enum.filter(fn identity ->
identity.provider_identifier in provider_identifiers_to_update
end)
|> repo.preload(:actor)
|> Repo.preload(:actor)
|> Enum.reduce(%{}, fn identity, acc ->
acc_identity = Map.get(acc, identity.provider_identifier)
@@ -161,7 +142,7 @@ defmodule Domain.Auth.Identity.Sync do
attrs = Map.get(attrs_by_provider_identifier, provider_identifier)
changeset = Identity.Changeset.update_identity_and_actor(identity, attrs)
case repo.update(changeset) do
case Repo.update(changeset) do
{:ok, identity} ->
{:cont, {:ok, [identity | acc]}}

View File

@@ -56,6 +56,10 @@ defmodule Domain.Auth.Provider.Query do
|> where([providers: providers], is_nil(providers.sync_disabled_at))
end
def order_by_sync_priority(queryable) do
order_by(queryable, [providers: providers], asc_nulls_first: providers.last_synced_at)
end
def by_non_empty_refresh_token(queryable) do
where(
queryable,

View File

@@ -14,7 +14,7 @@ defmodule Domain.Billing do
def init(_opts) do
children = [
APIClient,
{Domain.Jobs, Jobs}
Jobs.CheckAccountLimits
]
if enabled?() do

View File

@@ -1,8 +1,13 @@
defmodule Domain.Billing.Jobs do
use Domain.Jobs.Recurrent, otp_app: :domain
defmodule Domain.Billing.Jobs.CheckAccountLimits do
use Domain.Jobs.Job,
otp_app: :domain,
every: :timer.minutes(5),
executor: Domain.Jobs.Executors.GloballyUnique
alias Domain.{Accounts, Billing, Actors, Clients, Gateways}
every minutes(30), :check_account_limits do
@impl true
def execute(_config) do
Accounts.all_active_accounts!()
|> Enum.each(fn account ->
if Billing.enabled?() and Billing.account_provisioned?(account) do

View File

@@ -173,7 +173,7 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
discovered_nodes_count: count
})
Logger.debug("Found #{count}", module: __MODULE__, nodes: Enum.join(nodes, ", "))
Logger.debug("Found #{count} nodes", module: __MODULE__, nodes: Enum.join(nodes, ", "))
{:ok, nodes}
end

View File

@@ -1,33 +0,0 @@
defmodule Domain.Jobs do
@moduledoc """
This module starts all recurrent job handlers defined by a module
in individual processes and supervises them.
"""
use Supervisor
def start_link(module) do
Supervisor.start_link(__MODULE__, module)
end
def init(module) do
config = module.__config__()
children =
Enum.flat_map(module.__handlers__(), fn {name, interval} ->
handler_config = Keyword.get(config, name, [])
if Keyword.get(config, :enabled, true) and Keyword.get(handler_config, :enabled, true) do
[
Supervisor.child_spec(
{Domain.Jobs.Executors.Global, {{module, name}, interval, handler_config}},
id: {module, name}
)
]
else
[]
end
end)
Supervisor.init(children, strategy: :one_for_one)
end
end

View File

@@ -0,0 +1,119 @@
defmodule Domain.Jobs.Executors.Concurrent do
@moduledoc """
This module starts a GenServer that executes a callback function
on a given interval on each node that runs the jobs. This means
concurrency control should be implemented in the callback function
itself (eg. by using `SELECT ... FOR UPDATE SKIP LOCKED`) or
by using advisory locks (see `reject_locked/2`).
If you need globally unique jobs see `Domain.Jobs.Executors.GloballyUnique`.
"""
use GenServer
require Logger
require OpenTelemetry.Tracer
@doc """
Initializes the worker state.
"""
@callback state(config :: term()) :: {:ok, state :: term()}
@doc """
Executes the callback function with the state created in `c:state/1`.
"""
@callback execute(state :: term()) :: :ok
def start_link({module, interval, config}) do
GenServer.start_link(__MODULE__, {module, interval, config})
end
@impl true
def init({module, interval, config}) do
initial_delay = Keyword.get(config, :initial_delay, 0)
with {:ok, worker_state} <- module.state(config) do
{:ok, {module, worker_state, interval}, initial_delay}
end
end
@impl true
def handle_info(:timeout, {_module, _worker_state_, interval} = state) do
:ok = schedule_tick(interval)
{:noreply, state}
end
@impl true
def handle_info(:tick, {module, worker_state, interval} = state) do
:ok = execute_handler(module, worker_state)
:ok = schedule_tick(interval)
{:noreply, state}
end
# tick is scheduled by using a timeout message instead of `:timer.send_interval/2`,
# because we don't want jobs to overlap if they take too long to execute
defp schedule_tick(interval) do
_ = Process.send_after(self(), :tick, interval)
:ok
end
defp execute_handler(module, worker_state) do
job_callback = "#{module}.execute/2"
attributes = [
job_runner: __MODULE__,
job_execution_id: Ecto.UUID.generate(),
job_callback: job_callback
]
Logger.metadata(attributes)
OpenTelemetry.Tracer.with_span job_callback, attributes: attributes do
_ = module.execute(worker_state)
end
:ok
end
@doc """
A helper function that acquires an exclusive transaction-level advisory lock for each given row in the given table
and returns only the rows that were successfully locked.
This function is useful when you want to ensure that only one process is working on a given
row(s) at a time, without using actual row-level locks that can cause deadlocks and timeouts
for long-running transactions (like IdP syncs).
Execution of this function should be wrapped in a transaction block (eg. `Ecto.Repo.transaction/2`),
the locks are released when the transaction is committed or rolled back.
## Implementation notes
Postgres allows to either use one `bigint` or two-`int` for advisory locks,
we use the latter to avoid contention on a single value between processed tables
by using the oid of the table as as first of the lock arguments.
The lock is acquired by the `id` (second `int`) of the row but since our ids are UUIDs we also
hash them to fit into `int` range, this opens a possibility of hash collisions but a negligible
trade-off since the chances of a collision is very low and jobs will be restarted anyways only
delaying their execution.
`mod/2` is used to roll over the hash value to fit into the `int` range since `hashtext/1` return
can change between Postgres versions.
"""
def reject_locked(table, rows) do
ids = Enum.map(rows, & &1.id)
%Postgrex.Result{rows: not_locked_ids} =
Ecto.Adapters.SQL.query!(
Domain.Repo,
"""
SELECT id
FROM unnest($1::text[]) AS t(id)
WHERE pg_try_advisory_xact_lock(($2::text)::regclass::oid::int, mod(hashtext(t.id), 2147483647)::int)
""",
[ids, table]
)
not_locked_ids = Enum.map(not_locked_ids, fn [id] -> id end)
Enum.filter(rows, fn row -> row.id in not_locked_ids end)
end
end

View File

@@ -1,4 +1,4 @@
defmodule Domain.Jobs.Executors.Global do
defmodule Domain.Jobs.Executors.GloballyUnique do
@moduledoc """
This module is an abstraction on top of a GenServer that executes a callback function
on a given interval on a globally unique process in an Erlang cluster.
@@ -50,13 +50,18 @@ defmodule Domain.Jobs.Executors.Global do
require Logger
require OpenTelemetry.Tracer
def start_link({{module, function}, interval, config}) do
GenServer.start_link(__MODULE__, {{module, function}, interval, config})
@doc """
Executes the callback function with the given configuration.
"""
@callback execute(config :: term()) :: :ok
def start_link({module, interval, config}) do
GenServer.start_link(__MODULE__, {module, interval, config})
end
@impl true
def init({{module, function}, interval, config}) do
name = global_name(module, function)
def init({module, interval, config}) do
name = global_name(module)
# `random_notify_name` is used to avoid name conflicts in a cluster during deployments and
# network splits, it randomly selects one of the duplicate pids for registration,
@@ -66,52 +71,40 @@ defmodule Domain.Jobs.Executors.Global do
pid when is_pid(pid) <- :global.whereis_name(name) do
# we monitor the leader process so that we start a race to become a new leader when it's down
monitor_ref = Process.monitor(pid)
{:ok, {{{module, function}, interval, config}, {:fallback, pid, monitor_ref}}, :hibernate}
{:ok, {{module, interval, config}, {:fallback, pid, monitor_ref}}, :hibernate}
else
:yes ->
Logger.debug("Recurrent job will be handled on this node",
module: module,
function: function
)
Logger.debug("Recurrent job will be handled on this node", module: module)
initial_delay = Keyword.get(config, :initial_delay, 0)
{:ok, {{{module, function}, interval, config}, :leader}, initial_delay}
{:ok, {{module, interval, config}, :leader}, initial_delay}
:undefined ->
Logger.warning("Recurrent job leader exists but is not yet available",
module: module,
function: function
)
Logger.warning("Recurrent job leader exists but is not yet available", module: module)
_timer_ref = :timer.sleep(100)
init(module)
end
end
@impl true
def handle_info(:timeout, {{{_module, _name}, interval, _config}, :leader} = state) do
def handle_info(:timeout, {{_module, interval, _config}, :leader} = state) do
:ok = schedule_tick(interval)
{:noreply, state}
end
@impl true
def handle_info(
{:global_name_conflict, {__MODULE__, module, function}},
{{{module, function}, interval, config}, _leader_or_fallback} = state
{:global_name_conflict, {__MODULE__, module}},
{{module, interval, config}, _leader_or_fallback} = state
) do
name = global_name(module, function)
name = global_name(module)
with pid when is_pid(pid) <- :global.whereis_name(name) do
monitor_ref = Process.monitor(pid)
state = {{{module, function}, interval, config}, {:fallback, pid, monitor_ref}}
state = {{module, interval, config}, {:fallback, pid, monitor_ref}}
{:noreply, state, :hibernate}
else
:undefined ->
Logger.warning("Recurrent job name conflict",
module: module,
function: function
)
Logger.warning("Recurrent job name conflict", module: module)
_timer_ref = :timer.sleep(100)
handle_info({:global_name_conflict, module}, state)
end
@@ -119,7 +112,7 @@ defmodule Domain.Jobs.Executors.Global do
def handle_info(
{:DOWN, _ref, :process, _pid, reason},
{{{module, function}, interval, config}, {:fallback, pid, _monitor_ref}}
{{module, interval, config}, {:fallback, pid, _monitor_ref}}
) do
# Solves the "Retry Storm" antipattern
backoff_with_jitter = :rand.uniform(200) - 1
@@ -127,12 +120,11 @@ defmodule Domain.Jobs.Executors.Global do
Logger.info("Recurrent job leader is down",
module: module,
function: function,
leader_pid: inspect(pid),
leader_exit_reason: inspect(reason, pretty: true)
)
case init({{module, function}, interval, config}) do
case init({module, interval, config}) do
{:ok, state, :hibernate} -> {:noreply, state, :hibernate}
{:ok, state, _initial_delay} -> {:noreply, state, 0}
end
@@ -143,8 +135,8 @@ defmodule Domain.Jobs.Executors.Global do
{:noreply, state}
end
def handle_info(:tick, {{{module, function}, interval, config}, :leader} = state) do
:ok = execute_handler(module, function, config)
def handle_info(:tick, {{module, interval, config}, :leader} = state) do
:ok = execute_handler(module, config)
:ok = schedule_tick(interval)
{:noreply, state}
end
@@ -156,8 +148,8 @@ defmodule Domain.Jobs.Executors.Global do
:ok
end
defp execute_handler(module, function, config) do
job_callback = "#{module}.#{function}/1"
defp execute_handler(module, config) do
job_callback = "#{module}.execute/1"
attributes = [
job_runner: __MODULE__,
@@ -168,11 +160,11 @@ defmodule Domain.Jobs.Executors.Global do
Logger.metadata(attributes)
OpenTelemetry.Tracer.with_span job_callback, attributes: attributes do
_ = apply(module, function, [config])
_ = module.execute(config)
end
:ok
end
defp global_name(module, function), do: {__MODULE__, module, function}
defp global_name(module), do: {__MODULE__, module}
end

View File

@@ -0,0 +1,27 @@
defmodule Domain.Jobs.Job do
defmacro __using__(opts) do
quote bind_quoted: [opts: opts, location: :keep] do
@otp_app Keyword.fetch!(opts, :otp_app)
@interval Keyword.fetch!(opts, :every)
@executor Keyword.fetch!(opts, :executor)
@behaviour @executor
def child_spec(_opts) do
config = __config__()
if Keyword.get(config, :enabled, true) do
Supervisor.child_spec({@executor, {__MODULE__, @interval, config}}, id: __MODULE__)
else
:ignore
end
end
@doc """
Returns the configuration for the job that is defined in the application environment.
"""
@spec __config__() :: Keyword.t()
def __config__, do: Domain.Config.get_env(@otp_app, __MODULE__, [])
end
end
end

View File

@@ -1,56 +0,0 @@
defmodule Domain.Jobs.Recurrent do
@doc """
This module provides a DSL to define recurrent jobs that run on an time interval basis.
"""
defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
import Domain.Jobs.Recurrent
# Accumulate handlers and define a `__handlers__/0` function to list them
@before_compile Domain.Jobs.Recurrent
Module.register_attribute(__MODULE__, :handlers, accumulate: true)
# Will read the config from the application environment
@otp_app Keyword.fetch!(opts, :otp_app)
@spec __config__() :: Keyword.t()
def __config__, do: Domain.Config.get_env(@otp_app, __MODULE__, [])
end
end
defmacro __before_compile__(_env) do
quote do
@spec __handlers__() :: [{atom(), pos_integer()}]
def __handlers__, do: @handlers
end
end
@doc """
Defines a code to execute every `interval` milliseconds.
Is it recommended to use `seconds/1`, `minutes/1` macros to define the interval.
Behind the hood it defines a function `execute(name, interval, do: ..)` and adds it's name to the
module attribute.
"""
defmacro every(interval, name, do: block) do
quote do
@handlers {unquote(name), unquote(interval)}
def unquote(name)(unquote(Macro.var(:_config, nil))), do: unquote(block)
end
end
defmacro every(interval, name, config, do: block) do
quote do
@handlers {unquote(name), unquote(interval)}
def unquote(name)(unquote(config)), do: unquote(block)
end
end
def seconds(num) do
:timer.seconds(num)
end
def minutes(num) do
:timer.minutes(num)
end
end

View File

@@ -77,7 +77,18 @@ defmodule Domain.Telemetry do
# Application metrics
last_value("domain.relays.online_relays_count"),
last_value("domain.metrics.discovered_nodes_count")
last_value("domain.metrics.discovered_nodes_count"),
## Directory Syncs
summary("domain.directory_sync.data_fetch_total_time",
tags: [:account_id, :provider_id, :provider_adapter]
),
summary("domain.directory_sync.db_operations_total_time",
tags: [:account_id, :provider_id, :provider_adapter]
),
distribution("domain.directory_sync.total_time",
tags: [:account_id, :provider_id, :provider_adapter]
)
]
end

View File

@@ -12,7 +12,7 @@ defmodule Domain.Tokens do
@impl true
def init(_init_arg) do
children = [
{Domain.Jobs, Jobs}
Jobs.DeleteExpiredTokens
]
Supervisor.init(children, strategy: :one_for_one)

View File

@@ -1,10 +0,0 @@
defmodule Domain.Tokens.Jobs do
use Domain.Jobs.Recurrent, otp_app: :domain
alias Domain.Tokens
require Logger
every minutes(5), :delete_expired_tokens do
{:ok, _count} = Tokens.delete_expired_tokens()
:ok
end
end

View File

@@ -0,0 +1,15 @@
defmodule Domain.Tokens.Jobs.DeleteExpiredTokens do
use Domain.Jobs.Job,
otp_app: :domain,
every: :timer.minutes(5),
executor: Domain.Jobs.Executors.GloballyUnique
alias Domain.Tokens
require Logger
@impl true
def execute(_config) do
{:ok, _count} = Tokens.delete_expired_tokens()
:ok
end
end

View File

@@ -585,7 +585,7 @@ defmodule Domain.ActorsTest do
end
end
describe "sync_provider_groups_multi/2" do
describe "sync_provider_groups/2" do
setup do
account = Fixtures.Accounts.create_account()
@@ -601,15 +601,13 @@ defmodule Domain.ActorsTest do
%{"name" => "OrgUnit:Engineering", "provider_identifier" => "OU:OU_ID1"}
]
multi = sync_provider_groups_multi(provider, attrs_list)
assert {:ok,
%{
plan_groups: {upsert, []},
delete_groups: [],
upsert_groups: [_group1, _group2],
plan: {upsert, []},
deleted: [],
upserted: [_group1, _group2],
group_ids_by_provider_identifier: group_ids_by_provider_identifier
}} = Repo.transaction(multi)
}} = sync_provider_groups(provider, attrs_list)
assert Enum.all?(["G:GROUP_ID1", "OU:OU_ID1"], &(&1 in upsert))
groups = Repo.all(Actors.Group)
@@ -651,15 +649,13 @@ defmodule Domain.ActorsTest do
%{"name" => "OrgUnit:Engineering", "provider_identifier" => "OU:OU_ID1"}
]
multi = sync_provider_groups_multi(provider, attrs_list)
assert {:ok,
%{
plan_groups: {upsert, []},
delete_groups: [],
upsert_groups: [_group1, _group2],
plan: {upsert, []},
deleted: [],
upserted: [_group1, _group2],
group_ids_by_provider_identifier: group_ids_by_provider_identifier
}} = Repo.transaction(multi)
}} = sync_provider_groups(provider, attrs_list)
assert Enum.all?(["G:GROUP_ID1", "OU:OU_ID1"], &(&1 in upsert))
assert Repo.aggregate(Actors.Group, :count) == 2
@@ -719,16 +715,14 @@ defmodule Domain.ActorsTest do
attrs_list = []
multi = sync_provider_groups_multi(provider, attrs_list)
assert {:ok,
%{
groups: [_group1, _group2],
plan_groups: {[], delete},
delete_groups: [deleted_group1, deleted_group2],
upsert_groups: [],
plan: {[], delete},
deleted: [deleted_group1, deleted_group2],
upserted: [],
group_ids_by_provider_identifier: group_ids_by_provider_identifier
}} = Repo.transaction(multi)
}} = sync_provider_groups(provider, attrs_list)
assert Enum.all?(["G:GROUP_ID1", "OU:OU_ID1"], &(&1 in delete))
assert deleted_group1.provider_identifier in ["G:GROUP_ID1", "OU:OU_ID1"]
@@ -772,21 +766,19 @@ defmodule Domain.ActorsTest do
attrs_list = []
multi = sync_provider_groups_multi(provider, attrs_list)
assert Repo.transaction(multi) ==
assert sync_provider_groups(provider, attrs_list) ==
{:ok,
%{
groups: [],
plan_groups: {[], []},
delete_groups: [],
upsert_groups: [],
plan: {[], []},
deleted: [],
upserted: [],
group_ids_by_provider_identifier: %{}
}}
end
end
describe "sync_provider_memberships_multi/2" do
describe "sync_provider_memberships/2" do
setup do
account = Fixtures.Accounts.create_account()
@@ -870,12 +862,6 @@ defmodule Domain.ActorsTest do
group2.provider_identifier => group2.id
}
multi =
Ecto.Multi.new()
|> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier)
|> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier)
|> sync_provider_memberships_multi(provider, tuples_list)
:ok = subscribe_to_membership_updates_for_actor(actor1)
:ok = subscribe_to_membership_updates_for_actor(actor2)
:ok = Domain.Policies.subscribe_to_events_for_actor(actor1)
@@ -885,10 +871,16 @@ defmodule Domain.ActorsTest do
assert {:ok,
%{
plan_memberships: {insert, []},
delete_memberships: {0, nil},
insert_memberships: [_membership1, _membership2]
}} = Repo.transaction(multi)
plan: {insert, []},
deleted_stats: {0, nil},
inserted: [_membership1, _membership2]
}} =
sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
provider,
tuples_list
)
assert {group1.id, identity1.actor_id} in insert
assert {group2.id, identity2.actor_id} in insert
@@ -941,12 +933,6 @@ defmodule Domain.ActorsTest do
group2.provider_identifier => group2.id
}
multi =
Ecto.Multi.new()
|> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier)
|> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier)
|> sync_provider_memberships_multi(provider, tuples_list)
:ok = subscribe_to_membership_updates_for_actor(identity1.actor_id)
:ok = subscribe_to_membership_updates_for_actor(identity2.actor_id)
:ok = Domain.Policies.subscribe_to_events_for_actor(identity1.actor_id)
@@ -956,10 +942,16 @@ defmodule Domain.ActorsTest do
assert {:ok,
%{
plan_memberships: {[], []},
delete_memberships: {0, nil},
insert_memberships: []
}} = Repo.transaction(multi)
plan: {[], []},
deleted_stats: {0, nil},
inserted: []
}} =
sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
provider,
tuples_list
)
assert Repo.aggregate(Actors.Membership, :count) == 2
assert Repo.aggregate(Actors.Membership.Query.all(), :count) == 2
@@ -1021,12 +1013,6 @@ defmodule Domain.ActorsTest do
group2.provider_identifier => group2.id
}
multi =
Ecto.Multi.new()
|> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier)
|> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier)
|> sync_provider_memberships_multi(provider, tuples_list)
:ok = subscribe_to_membership_updates_for_actor(identity1.actor_id)
:ok = subscribe_to_membership_updates_for_actor(identity2.actor_id)
:ok = Domain.Policies.subscribe_to_events_for_actor(identity1.actor_id)
@@ -1037,10 +1023,16 @@ defmodule Domain.ActorsTest do
assert {:ok,
%{
plan_memberships: {[], delete},
delete_memberships: {2, nil},
insert_memberships: []
}} = Repo.transaction(multi)
plan: {[], delete},
deleted_stats: {2, nil},
inserted: []
}} =
sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
provider,
tuples_list
)
assert {group1.id, identity1.actor_id} in delete
assert {group2.id, identity2.actor_id} in delete
@@ -1095,18 +1087,18 @@ defmodule Domain.ActorsTest do
group1.provider_identifier => group1.id
}
multi =
Ecto.Multi.new()
|> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier)
|> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier)
|> sync_provider_memberships_multi(provider, tuples_list)
assert {:ok,
%{
plan_memberships: {[], delete},
insert_memberships: [],
delete_memberships: {1, nil}
}} = Repo.transaction(multi)
plan: {[], delete},
inserted: [],
deleted_stats: {1, nil}
}} =
sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
provider,
tuples_list
)
assert delete == [{group2.id, identity2.actor_id}]
@@ -1136,18 +1128,18 @@ defmodule Domain.ActorsTest do
group2.provider_identifier => group2.id
}
multi =
Ecto.Multi.new()
|> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier)
|> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier)
|> sync_provider_memberships_multi(provider, tuples_list)
assert {:ok,
%{
plan_memberships: {[], []},
delete_memberships: {0, nil},
insert_memberships: []
}} = Repo.transaction(multi)
plan: {[], []},
deleted_stats: {0, nil},
inserted: []
}} =
sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
provider,
tuples_list
)
end
test "deletes actors that are not processed by identity sync", %{
@@ -1182,18 +1174,18 @@ defmodule Domain.ActorsTest do
group2.provider_identifier => group2.id
}
multi =
Ecto.Multi.new()
|> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier)
|> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier)
|> sync_provider_memberships_multi(provider, tuples_list)
assert {:ok,
%{
plan_memberships: {[], delete},
delete_memberships: {2, nil},
insert_memberships: []
}} = Repo.transaction(multi)
plan: {[], delete},
deleted_stats: {2, nil},
inserted: []
}} =
sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
provider,
tuples_list
)
assert {group1.id, identity1.actor_id} in delete
assert {group2.id, identity2.actor_id} in delete
@@ -1231,18 +1223,18 @@ defmodule Domain.ActorsTest do
group_ids_by_provider_identifier = %{}
multi =
Ecto.Multi.new()
|> Ecto.Multi.put(:actor_ids_by_provider_identifier, actor_ids_by_provider_identifier)
|> Ecto.Multi.put(:group_ids_by_provider_identifier, group_ids_by_provider_identifier)
|> sync_provider_memberships_multi(provider, tuples_list)
assert {:ok,
%{
plan_memberships: {[], delete},
delete_memberships: {2, nil},
insert_memberships: []
}} = Repo.transaction(multi)
plan: {[], delete},
deleted_stats: {2, nil},
inserted: []
}} =
sync_provider_memberships(
actor_ids_by_provider_identifier,
group_ids_by_provider_identifier,
provider,
tuples_list
)
assert {group1.id, identity1.actor_id} in delete
assert {group2.id, identity2.actor_id} in delete

View File

@@ -0,0 +1,81 @@
defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.RefreshAccessTokensTest do
use Domain.DataCase, async: true
import Domain.Auth.Adapters.GoogleWorkspace.Jobs.RefreshAccessTokens
describe "execute/1" do
setup do
account = Fixtures.Accounts.create_account()
{provider, bypass} =
Fixtures.Auth.start_and_create_google_workspace_provider(account: account)
provider =
Domain.Fixture.update!(provider, %{
adapter_state: %{
"access_token" => "OIDC_ACCESS_TOKEN",
"refresh_token" => "OIDC_REFRESH_TOKEN",
"expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute),
"claims" => "openid email profile offline_access"
}
})
identity = Fixtures.Auth.create_identity(account: account, provider: provider)
%{
bypass: bypass,
account: account,
provider: provider,
identity: identity
}
end
test "refreshes the access token", %{
provider: provider,
identity: identity,
bypass: bypass
} do
{token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity)
Mocks.OpenIDConnect.expect_refresh_token(bypass, %{
"token_type" => "Bearer",
"id_token" => token,
"access_token" => "MY_ACCESS_TOKEN",
"refresh_token" => "OTHER_REFRESH_TOKEN",
"expires_in" => nil
})
Mocks.OpenIDConnect.expect_userinfo(bypass)
assert execute(%{}) == :ok
provider = Repo.get!(Domain.Auth.Provider, provider.id)
assert %{
"access_token" => "MY_ACCESS_TOKEN",
"claims" => ^claims,
"expires_at" => expires_at,
"refresh_token" => "OIDC_REFRESH_TOKEN",
"userinfo" => %{
"email" => "ada@example.com",
"email_verified" => true,
"family_name" => "Lovelace",
"given_name" => "Ada",
"locale" => "en",
"name" => "Ada Lovelace",
"picture" =>
"https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg",
"sub" => "353690423699814251281"
}
} = provider.adapter_state
assert expires_at
end
test "does not crash when endpoint it not available", %{
bypass: bypass
} do
Bypass.down(bypass)
assert execute(%{}) == :ok
end
end
end

View File

@@ -1,87 +1,10 @@
defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
use Domain.DataCase, async: true
alias Domain.{Auth, Actors}
alias Domain.Mocks.GoogleWorkspaceDirectory
import Domain.Auth.Adapters.GoogleWorkspace.Jobs
import Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectory
describe "refresh_access_tokens/1" do
setup do
account = Fixtures.Accounts.create_account()
{provider, bypass} =
Fixtures.Auth.start_and_create_google_workspace_provider(account: account)
provider =
Domain.Fixture.update!(provider, %{
adapter_state: %{
"access_token" => "OIDC_ACCESS_TOKEN",
"refresh_token" => "OIDC_REFRESH_TOKEN",
"expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute),
"claims" => "openid email profile offline_access"
}
})
identity = Fixtures.Auth.create_identity(account: account, provider: provider)
%{
bypass: bypass,
account: account,
provider: provider,
identity: identity
}
end
test "refreshes the access token", %{
provider: provider,
identity: identity,
bypass: bypass
} do
{token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity)
Mocks.OpenIDConnect.expect_refresh_token(bypass, %{
"token_type" => "Bearer",
"id_token" => token,
"access_token" => "MY_ACCESS_TOKEN",
"refresh_token" => "OTHER_REFRESH_TOKEN",
"expires_in" => nil
})
Mocks.OpenIDConnect.expect_userinfo(bypass)
assert refresh_access_tokens(%{}) == :ok
provider = Repo.get!(Domain.Auth.Provider, provider.id)
assert %{
"access_token" => "MY_ACCESS_TOKEN",
"claims" => ^claims,
"expires_at" => expires_at,
"refresh_token" => "OIDC_REFRESH_TOKEN",
"userinfo" => %{
"email" => "ada@example.com",
"email_verified" => true,
"family_name" => "Lovelace",
"given_name" => "Ada",
"locale" => "en",
"name" => "Ada Lovelace",
"picture" =>
"https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg",
"sub" => "353690423699814251281"
}
} = provider.adapter_state
assert expires_at
end
test "does not crash when endpoint it not available", %{
bypass: bypass
} do
Bypass.down(bypass)
assert refresh_access_tokens(%{}) == :ok
end
end
describe "sync_directory/1" do
describe "execute/1" do
setup do
account = Fixtures.Accounts.create_account()
@@ -98,7 +21,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
test "returns error when IdP sync is not enabled", %{account: account, provider: provider} do
{:ok, _account} = Domain.Accounts.update_account(account, %{features: %{idp_sync: false}})
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at
@@ -254,7 +178,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
GoogleWorkspaceDirectory.mock_group_members_list_endpoint(bypass, group["id"], members)
end)
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
groups = Actors.Group |> Repo.all()
assert length(groups) == 2
@@ -300,7 +225,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
Bypass.down(bypass)
GoogleWorkspaceDirectory.override_endpoint_url("http://localhost:#{bypass.port}/")
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert Repo.aggregate(Actors.Group, :count) == 0
end
@@ -343,7 +269,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
GoogleWorkspaceDirectory.mock_organization_units_list_endpoint(bypass, [])
GoogleWorkspaceDirectory.mock_users_list_endpoint(bypass, users)
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_identity =
Repo.get(Domain.Auth.Identity, identity.id)
@@ -574,7 +501,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
GoogleWorkspaceDirectory.mock_group_members_list_endpoint(bypass, "GROUP_ID1", two_members)
GoogleWorkspaceDirectory.mock_group_members_list_endpoint(bypass, "GROUP_ID2", one_member)
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_group = Repo.get(Domain.Actors.Group, group.id)
assert updated_group.name == "Group:Infrastructure"
@@ -704,7 +632,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
end)
end
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at
@@ -725,7 +654,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
end)
end
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at
@@ -791,7 +721,8 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.JobsTest do
end)
end
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at

View File

@@ -0,0 +1,81 @@
defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.RefreshAccessTokensTest do
use Domain.DataCase, async: true
import Domain.Auth.Adapters.MicrosoftEntra.Jobs.RefreshAccessTokens
describe "execute/1" do
setup do
account = Fixtures.Accounts.create_account()
{provider, bypass} =
Fixtures.Auth.start_and_create_microsoft_entra_provider(account: account)
provider =
Domain.Fixture.update!(provider, %{
adapter_state: %{
"access_token" => "OIDC_ACCESS_TOKEN",
"refresh_token" => "OIDC_REFRESH_TOKEN",
"expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute),
"claims" => "openid email profile offline_access"
}
})
identity = Fixtures.Auth.create_identity(account: account, provider: provider)
%{
bypass: bypass,
account: account,
provider: provider,
identity: identity
}
end
test "refreshes the access token", %{
provider: provider,
identity: identity,
bypass: bypass
} do
{token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity)
Mocks.OpenIDConnect.expect_refresh_token(bypass, %{
"token_type" => "Bearer",
"id_token" => token,
"access_token" => "MY_ACCESS_TOKEN",
"refresh_token" => "OTHER_REFRESH_TOKEN",
"expires_in" => nil
})
Mocks.OpenIDConnect.expect_userinfo(bypass)
assert execute(%{}) == :ok
provider = Repo.get!(Domain.Auth.Provider, provider.id)
assert %{
"access_token" => "MY_ACCESS_TOKEN",
"claims" => ^claims,
"expires_at" => expires_at,
"refresh_token" => "OIDC_REFRESH_TOKEN",
"userinfo" => %{
"email" => "ada@example.com",
"email_verified" => true,
"family_name" => "Lovelace",
"given_name" => "Ada",
"locale" => "en",
"name" => "Ada Lovelace",
"picture" =>
"https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg",
"sub" => "353690423699814251281"
}
} = provider.adapter_state
assert expires_at
end
test "does not crash when endpoint it not available", %{
bypass: bypass
} do
Bypass.down(bypass)
assert execute(%{}) == :ok
end
end
end

View File

@@ -1,87 +1,10 @@
defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do
defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
use Domain.DataCase, async: true
alias Domain.{Auth, Actors}
alias Domain.Mocks.MicrosoftEntraDirectory
import Domain.Auth.Adapters.MicrosoftEntra.Jobs
import Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectory
describe "refresh_access_tokens/1" do
setup do
account = Fixtures.Accounts.create_account()
{provider, bypass} =
Fixtures.Auth.start_and_create_microsoft_entra_provider(account: account)
provider =
Domain.Fixture.update!(provider, %{
adapter_state: %{
"access_token" => "OIDC_ACCESS_TOKEN",
"refresh_token" => "OIDC_REFRESH_TOKEN",
"expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute),
"claims" => "openid email profile offline_access"
}
})
identity = Fixtures.Auth.create_identity(account: account, provider: provider)
%{
bypass: bypass,
account: account,
provider: provider,
identity: identity
}
end
test "refreshes the access token", %{
provider: provider,
identity: identity,
bypass: bypass
} do
{token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity)
Mocks.OpenIDConnect.expect_refresh_token(bypass, %{
"token_type" => "Bearer",
"id_token" => token,
"access_token" => "MY_ACCESS_TOKEN",
"refresh_token" => "OTHER_REFRESH_TOKEN",
"expires_in" => nil
})
Mocks.OpenIDConnect.expect_userinfo(bypass)
assert refresh_access_tokens(%{}) == :ok
provider = Repo.get!(Domain.Auth.Provider, provider.id)
assert %{
"access_token" => "MY_ACCESS_TOKEN",
"claims" => ^claims,
"expires_at" => expires_at,
"refresh_token" => "OIDC_REFRESH_TOKEN",
"userinfo" => %{
"email" => "ada@example.com",
"email_verified" => true,
"family_name" => "Lovelace",
"given_name" => "Ada",
"locale" => "en",
"name" => "Ada Lovelace",
"picture" =>
"https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg",
"sub" => "353690423699814251281"
}
} = provider.adapter_state
assert expires_at
end
test "does not crash when endpoint it not available", %{
bypass: bypass
} do
Bypass.down(bypass)
assert refresh_access_tokens(%{}) == :ok
end
end
describe "sync_directory/1" do
describe "execute/1" do
setup do
account = Fixtures.Accounts.create_account()
@@ -98,7 +21,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do
test "returns error when IdP sync is not enabled", %{account: account, provider: provider} do
{:ok, _account} = Domain.Accounts.update_account(account, %{features: %{idp_sync: false}})
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at
@@ -160,7 +84,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do
MicrosoftEntraDirectory.mock_group_members_list_endpoint(bypass, group["id"], members)
end)
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
groups = Actors.Group |> Repo.all()
assert length(groups) == 2
@@ -206,7 +131,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do
Bypass.down(bypass)
MicrosoftEntraDirectory.override_endpoint_url("http://localhost:#{bypass.port}/")
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert Repo.aggregate(Actors.Group, :count) == 0
end
@@ -240,7 +166,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do
MicrosoftEntraDirectory.mock_groups_list_endpoint(bypass, [])
MicrosoftEntraDirectory.mock_users_list_endpoint(bypass, users)
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_identity =
Repo.get(Domain.Auth.Identity, identity.id)
@@ -407,7 +334,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do
one_member
)
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_group = Repo.get(Domain.Actors.Group, group.id)
assert updated_group.name == "Group:All"
@@ -506,7 +434,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do
end)
end
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at
@@ -522,7 +451,8 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.JobsTest do
end)
end
assert sync_directory(%{}) == :ok
{:ok, pid} = Task.Supervisor.start_link()
assert execute(%{task_supervisor: pid}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at

View File

@@ -0,0 +1,81 @@
defmodule Domain.Auth.Adapters.Okta.Jobs.RefreshAccessTokensTest do
use Domain.DataCase, async: true
import Domain.Auth.Adapters.Okta.Jobs.RefreshAccessTokens
describe "execute/1" do
setup do
account = Fixtures.Accounts.create_account()
{provider, bypass} =
Fixtures.Auth.start_and_create_okta_provider(account: account)
provider =
Domain.Fixture.update!(provider, %{
adapter_state: %{
"access_token" => "OIDC_ACCESS_TOKEN",
"refresh_token" => "OIDC_REFRESH_TOKEN",
"expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute),
"claims" => "openid email profile offline_access"
}
})
identity = Fixtures.Auth.create_identity(account: account, provider: provider)
%{
bypass: bypass,
account: account,
provider: provider,
identity: identity
}
end
test "refreshes the access token", %{
provider: provider,
identity: identity,
bypass: bypass
} do
{token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity)
Mocks.OpenIDConnect.expect_refresh_token(bypass, %{
"token_type" => "Bearer",
"id_token" => token,
"access_token" => "MY_ACCESS_TOKEN",
"refresh_token" => "OTHER_REFRESH_TOKEN",
"expires_in" => nil
})
Mocks.OpenIDConnect.expect_userinfo(bypass)
assert execute(%{}) == :ok
provider = Repo.get!(Domain.Auth.Provider, provider.id)
assert %{
"access_token" => "MY_ACCESS_TOKEN",
"claims" => ^claims,
"expires_at" => expires_at,
"refresh_token" => "OIDC_REFRESH_TOKEN",
"userinfo" => %{
"email" => "ada@example.com",
"email_verified" => true,
"family_name" => "Lovelace",
"given_name" => "Ada",
"locale" => "en",
"name" => "Ada Lovelace",
"picture" =>
"https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg",
"sub" => "353690423699814251281"
}
} = provider.adapter_state
assert expires_at
end
test "does not crash when endpoint is not available", %{
bypass: bypass
} do
Bypass.down(bypass)
assert execute(%{}) == :ok
end
end
end

View File

@@ -1,87 +1,10 @@
defmodule Domain.Auth.Adapters.Okta.JobsTest do
defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
use Domain.DataCase, async: true
alias Domain.{Auth, Actors}
alias Domain.Mocks.OktaDirectory
import Domain.Auth.Adapters.Okta.Jobs
import Domain.Auth.Adapters.Okta.Jobs.SyncDirectory
describe "refresh_access_tokens/1" do
setup do
account = Fixtures.Accounts.create_account()
{provider, bypass} =
Fixtures.Auth.start_and_create_okta_provider(account: account)
provider =
Domain.Fixture.update!(provider, %{
adapter_state: %{
"access_token" => "OIDC_ACCESS_TOKEN",
"refresh_token" => "OIDC_REFRESH_TOKEN",
"expires_at" => DateTime.utc_now() |> DateTime.add(15, :minute),
"claims" => "openid email profile offline_access"
}
})
identity = Fixtures.Auth.create_identity(account: account, provider: provider)
%{
bypass: bypass,
account: account,
provider: provider,
identity: identity
}
end
test "refreshes the access token", %{
provider: provider,
identity: identity,
bypass: bypass
} do
{token, claims} = Mocks.OpenIDConnect.generate_openid_connect_token(provider, identity)
Mocks.OpenIDConnect.expect_refresh_token(bypass, %{
"token_type" => "Bearer",
"id_token" => token,
"access_token" => "MY_ACCESS_TOKEN",
"refresh_token" => "OTHER_REFRESH_TOKEN",
"expires_in" => nil
})
Mocks.OpenIDConnect.expect_userinfo(bypass)
assert refresh_access_tokens(%{}) == :ok
provider = Repo.get!(Domain.Auth.Provider, provider.id)
assert %{
"access_token" => "MY_ACCESS_TOKEN",
"claims" => ^claims,
"expires_at" => expires_at,
"refresh_token" => "OIDC_REFRESH_TOKEN",
"userinfo" => %{
"email" => "ada@example.com",
"email_verified" => true,
"family_name" => "Lovelace",
"given_name" => "Ada",
"locale" => "en",
"name" => "Ada Lovelace",
"picture" =>
"https://lh3.googleusercontent.com/-XdUIqdMkCWA/AAAAAAAAAAI/AAAAAAAAAAA/4252rscbv5M/photo.jpg",
"sub" => "353690423699814251281"
}
} = provider.adapter_state
assert expires_at
end
test "does not crash when endpoint is not available", %{
bypass: bypass
} do
Bypass.down(bypass)
assert refresh_access_tokens(%{}) == :ok
end
end
describe "sync_directory/1" do
describe "execute/1" do
setup do
account = Fixtures.Accounts.create_account()
@@ -306,7 +229,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do
OktaDirectory.mock_group_members_list_endpoint(bypass, group["id"], members)
end)
assert sync_directory(%{}) == :ok
assert execute(%{}) == :ok
groups = Actors.Group |> Repo.all()
assert length(groups) == 2
@@ -350,7 +273,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do
test "does not crash on endpoint errors", %{bypass: bypass} do
Bypass.down(bypass)
assert sync_directory(%{}) == :ok
assert execute(%{}) == :ok
assert Repo.aggregate(Actors.Group, :count) == 0
end
@@ -400,7 +323,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do
OktaDirectory.mock_groups_list_endpoint(bypass, [])
OktaDirectory.mock_users_list_endpoint(bypass, users)
assert sync_directory(%{}) == :ok
assert execute(%{}) == :ok
assert updated_identity =
Repo.get(Domain.Auth.Identity, identity.id)
@@ -729,7 +652,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do
one_member
)
assert sync_directory(%{}) == :ok
assert execute(%{}) == :ok
assert updated_group = Repo.get(Domain.Actors.Group, group.id)
assert updated_group.name == "Group:Engineering"
@@ -821,7 +744,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do
end)
end
assert sync_directory(%{}) == :ok
assert execute(%{}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at
@@ -837,7 +760,7 @@ defmodule Domain.Auth.Adapters.Okta.JobsTest do
end)
end
assert sync_directory(%{}) == :ok
assert execute(%{}) == :ok
assert updated_provider = Repo.get(Domain.Auth.Provider, provider.id)
refute updated_provider.last_synced_at

View File

@@ -1513,7 +1513,7 @@ defmodule Domain.AuthTest do
end
end
describe "sync_provider_identities_multi/2" do
describe "sync_provider_identities/2" do
setup do
account = Fixtures.Accounts.create_account()
@@ -1544,16 +1544,15 @@ defmodule Domain.AuthTest do
provider_identifiers = Enum.map(attrs_list, & &1["provider_identifier"])
actor_names = Enum.map(attrs_list, & &1["actor"]["name"])
multi = sync_provider_identities_multi(provider, attrs_list)
assert {:ok,
%{
identities: [],
plan_identities: {insert, [], []},
insert_identities: [_actor1, _actor2],
delete_identities: [],
plan: {insert, [], []},
insert: [_actor1, _actor2],
update: [],
delete: [],
actor_ids_by_provider_identifier: actor_ids_by_provider_identifier
}} = Repo.transaction(multi)
}} = sync_provider_identities(provider, attrs_list)
assert Enum.all?(provider_identifiers, &(&1 in insert))
@@ -1608,16 +1607,15 @@ defmodule Domain.AuthTest do
}
]
multi = sync_provider_identities_multi(provider, attrs_list)
assert {:ok,
%{
identities: [_identity1, _identity2],
plan_identities: {[], update, []},
delete_identities: [],
insert_identities: [],
plan: {[], update, []},
delete: [],
update: [_updated_identity1, _updated_identity2],
insert: [],
actor_ids_by_provider_identifier: actor_ids_by_provider_identifier
}} = Repo.transaction(multi)
}} = sync_provider_identities(provider, attrs_list)
assert length(update) == 2
assert identity1.provider_identifier in update
@@ -1659,16 +1657,14 @@ defmodule Domain.AuthTest do
}
]
multi = sync_provider_identities_multi(provider, attrs_list)
assert {:ok,
%{
identities: [fetched_identity],
plan_identities: {[], ["USER_ID1"], []},
delete_identities: [],
insert_identities: [],
plan: {[], ["USER_ID1"], []},
delete: [],
insert: [],
actor_ids_by_provider_identifier: actor_ids_by_provider_identifier
}} = Repo.transaction(multi)
}} = sync_provider_identities(provider, attrs_list)
assert fetched_identity.actor_id == identity.actor_id
assert actor_ids_by_provider_identifier == %{"USER_ID1" => identity.actor_id}
@@ -1695,16 +1691,15 @@ defmodule Domain.AuthTest do
|> Fixtures.Auth.delete_identity()
attrs_list = []
multi = sync_provider_identities_multi(provider, attrs_list)
assert {:ok,
%{
identities: [fetched_identity],
plan_identities: {[], [], []},
delete_identities: [],
insert_identities: [],
plan: {[], [], []},
delete: [],
insert: [],
actor_ids_by_provider_identifier: %{}
}} = Repo.transaction(multi)
}} = sync_provider_identities(provider, attrs_list)
assert fetched_identity.id == identity.id
@@ -1757,16 +1752,14 @@ defmodule Domain.AuthTest do
attrs_list = []
multi = sync_provider_identities_multi(provider, attrs_list)
assert {:ok,
%{
identities: [_identity1, _identity2],
plan_identities: {[], [], delete},
delete_identities: [deleted_identity1, deleted_identity2],
insert_identities: [],
plan: {[], [], delete},
delete: [deleted_identity1, deleted_identity2],
insert: [],
actor_ids_by_provider_identifier: actor_ids_by_provider_identifier
}} = Repo.transaction(multi)
}} = sync_provider_identities(provider, attrs_list)
assert Enum.all?(provider_identifiers, &(&1 in delete))
assert deleted_identity1.provider_identifier in delete
@@ -1805,18 +1798,15 @@ defmodule Domain.AuthTest do
attrs_list = []
multi = sync_provider_identities_multi(provider, attrs_list)
assert Repo.transaction(multi) ==
assert sync_provider_identities(provider, attrs_list) ==
{:ok,
%{
identities: [],
plan_identities: {[], [], []},
delete_identities: [],
insert_identities: [],
update_identities_and_actors: [],
actor_ids_by_provider_identifier: %{},
recalculate_dynamic_groups: []
plan: {[], [], []},
delete: [],
update: [],
insert: [],
actor_ids_by_provider_identifier: %{}
}}
end
@@ -1830,9 +1820,7 @@ defmodule Domain.AuthTest do
}
]
multi = sync_provider_identities_multi(provider, attrs_list)
assert {:error, :insert_identities, changeset, _effects_so_far} = Repo.transaction(multi)
assert {:error, changeset} = sync_provider_identities(provider, attrs_list)
assert errors_on(changeset) == %{
actor: %{
@@ -1876,16 +1864,14 @@ defmodule Domain.AuthTest do
}
]
multi = sync_provider_identities_multi(provider, attrs_list)
assert {:ok,
%{
identities: [_identity1, _identity2],
plan_identities: {[], update, []},
delete_identities: [],
insert_identities: [],
plan: {[], update, []},
delete: [],
insert: [],
actor_ids_by_provider_identifier: actor_ids_by_provider_identifier
}} = Repo.transaction(multi)
}} = sync_provider_identities(provider, attrs_list)
assert length(update) == 2
assert update == ["USER_ID1", "USER_ID1"]

View File

@@ -1,8 +1,8 @@
defmodule Domain.Billing.JobsTest do
defmodule Domain.Billing.Jobs.CheckAccountLimitsTest do
use Domain.DataCase, async: true
import Domain.Billing.Jobs
import Domain.Billing.Jobs.CheckAccountLimits
describe "check_account_limits/1" do
describe "execute/1" do
setup do
account =
Fixtures.Accounts.create_account(
@@ -22,7 +22,7 @@ defmodule Domain.Billing.JobsTest do
test "does nothing when limits are not violated", %{
account: account
} do
assert check_account_limits(%{}) == :ok
assert execute(%{}) == :ok
account = Repo.get!(Domain.Accounts.Account, account.id)
refute account.warning
@@ -55,7 +55,7 @@ defmodule Domain.Billing.JobsTest do
}
})
assert check_account_limits(%{}) == :ok
assert execute(%{}) == :ok
account = Repo.get!(Domain.Accounts.Account, account.id)

View File

@@ -0,0 +1,84 @@
defmodule Domain.Jobs.Executors.ConcurrentTest do
use Domain.DataCase, async: true
alias Domain.Fixtures
import Domain.Jobs.Executors.Concurrent
def state(config) do
{:ok, {:state, config}}
end
def execute({:state, config}) do
send(config[:test_pid], {:executed, self(), :erlang.monotonic_time()})
:ok
end
test "executes the handler on the interval" do
assert {:ok, _pid} = start_link({__MODULE__, 25, test_pid: self()})
assert_receive {:executed, _pid, time1}, 500
assert_receive {:executed, _pid, time2}, 500
assert time1 < time2
end
test "delays initial message by the initial_delay" do
assert {:ok, _pid} =
start_link({
__MODULE__,
25,
test_pid: self(), initial_delay: 100
})
refute_receive {:executed, _pid, _time}, 50
assert_receive {:executed, _pid, _time}, 2000
end
describe "reject_locked/2" do
test "returns all rows if none are locked" do
account1 = Fixtures.Accounts.create_account()
account2 = Fixtures.Accounts.create_account()
rows = [account1, account2]
Domain.Repo.checkout(fn ->
assert reject_locked("accounts", rows) == rows
end)
end
test "does not allow two processes to lock the same rows" do
account1 = Fixtures.Accounts.create_account()
account2 = Fixtures.Accounts.create_account()
rows = [account1, account2]
test_pid = self()
task1 =
Task.async(fn ->
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Domain.Repo)
Domain.Repo.checkout(fn ->
rows = reject_locked("accounts", rows)
send(test_pid, {:locked, rows})
Process.sleep(300)
end)
end)
task2 =
Task.async(fn ->
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Domain.Repo)
Domain.Repo.checkout(fn ->
rows = reject_locked("accounts", rows)
send(test_pid, {:locked, rows})
Process.sleep(300)
end)
end)
assert_receive {:locked, rows1}
assert_receive {:locked, rows2}
assert length(rows1) + length(rows2) == length(rows)
Task.ignore(task1)
Task.ignore(task2)
end
end
end

View File

@@ -1,14 +1,14 @@
defmodule Domain.Jobs.Executors.GlobalTest do
defmodule Domain.Jobs.Executors.GloballyUniqueTest do
use ExUnit.Case, async: true
import Domain.Jobs.Executors.Global
import Domain.Jobs.Executors.GloballyUnique
def send_test_message(config) do
def execute(config) do
send(config[:test_pid], {:executed, self(), :erlang.monotonic_time()})
:ok
end
test "executes the handler on the interval" do
assert {:ok, _pid} = start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
assert {:ok, _pid} = start_link({__MODULE__, 25, test_pid: self()})
assert_receive {:executed, _pid, time1}, 500
assert_receive {:executed, _pid, time2}, 500
@@ -19,25 +19,25 @@ defmodule Domain.Jobs.Executors.GlobalTest do
test "delays initial message by the initial_delay" do
assert {:ok, _pid} =
start_link({
{__MODULE__, :send_test_message},
__MODULE__,
25,
test_pid: self(), initial_delay: 100
})
refute_receive {:executed, _pid, _time}, 50
assert_receive {:executed, _pid, _time}, 1000
assert_receive {:executed, _pid, _time}, 2000
end
test "registers itself as a leader if there is no global name registered" do
assert {:ok, pid} = start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
assert_receive {:executed, ^pid, _time}, 1000
name = {Domain.Jobs.Executors.Global, __MODULE__, :send_test_message}
assert {:ok, pid} = start_link({__MODULE__, 25, test_pid: self()})
assert_receive {:executed, ^pid, _time}, 2000
name = {Domain.Jobs.Executors.GloballyUnique, __MODULE__}
assert :global.whereis_name(name) == pid
assert :sys.get_state(pid) ==
{
{
{__MODULE__, :send_test_message},
__MODULE__,
25,
[test_pid: self()]
},
@@ -47,17 +47,17 @@ defmodule Domain.Jobs.Executors.GlobalTest do
test "other processes register themselves as fallbacks and monitor the leader" do
assert {:ok, leader_pid} =
start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
start_link({__MODULE__, 25, test_pid: self()})
assert {:ok, fallback1_pid} =
start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
start_link({__MODULE__, 25, test_pid: self()})
assert {:ok, fallback2_pid} =
start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
start_link({__MODULE__, 25, test_pid: self()})
assert_receive {:executed, ^leader_pid, _time}, 500
name = {Domain.Jobs.Executors.Global, __MODULE__, :send_test_message}
name = {Domain.Jobs.Executors.GloballyUnique, __MODULE__}
assert :global.whereis_name(name) == leader_pid
assert {_state, {:fallback, ^leader_pid, _monitor_ref}} = :sys.get_state(fallback1_pid)
@@ -66,13 +66,13 @@ defmodule Domain.Jobs.Executors.GlobalTest do
test "other processes register a new leader when old one is down" do
assert {:ok, leader_pid} =
start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
start_link({__MODULE__, 25, test_pid: self()})
assert {:ok, fallback1_pid} =
start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
start_link({__MODULE__, 25, test_pid: self()})
assert {:ok, fallback2_pid} =
start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
start_link({__MODULE__, 25, test_pid: self()})
Process.flag(:trap_exit, true)
Process.exit(leader_pid, :kill)
@@ -91,7 +91,7 @@ defmodule Domain.Jobs.Executors.GlobalTest do
assert {_state, {:fallback, ^new_leader_pid, _monitor_ref}} = :sys.get_state(fallback_pid)
assert {_state, :leader} = :sys.get_state(new_leader_pid)
name = {Domain.Jobs.Executors.Global, __MODULE__, :send_test_message}
name = {Domain.Jobs.Executors.GloballyUnique, __MODULE__}
assert :global.whereis_name(name) == new_leader_pid
assert_receive {:executed, ^new_leader_pid, _time}, 1000

View File

@@ -1,43 +0,0 @@
defmodule Domain.Jobs.RecurrentTest do
use ExUnit.Case, async: true
import Domain.Jobs.Recurrent
defmodule TestDefinition do
use Domain.Jobs.Recurrent, otp_app: :domain
require Logger
every seconds(1), :second_test, config do
send(config[:test_pid], :executed)
end
every minutes(5), :minute_test do
:ok
end
end
describe "seconds/1" do
test "converts seconds to milliseconds" do
assert seconds(1) == 1_000
assert seconds(13) == 13_000
end
end
describe "minutes/1" do
test "converts minutes to milliseconds" do
assert minutes(1) == 60_000
assert minutes(13) == 780_000
end
end
test "defines callbacks" do
assert length(TestDefinition.__handlers__()) == 2
assert {:minute_test, 300_000} in TestDefinition.__handlers__()
assert {:second_test, 1000} in TestDefinition.__handlers__()
assert TestDefinition.minute_test(test_pid: self()) == :ok
assert TestDefinition.second_test(test_pid: self()) == :executed
assert_receive :executed
end
end

View File

@@ -40,7 +40,7 @@ defmodule Domain.Fixture do
end
def unique_integer do
System.unique_integer([:positive])
System.unique_integer([:positive, :monotonic])
end
def unique_ipv4 do

View File

@@ -270,7 +270,7 @@ locals {
cluster_version_label = "cluster_version"
cluster_version = split(".", local.portal_image_tag)[0]
node_name_label = "application"
polling_interval_ms = 7000
polling_interval_ms = 10000
})
},
{

View File

@@ -401,6 +401,12 @@ module "domain" {
name = "BACKGROUND_JOBS_ENABLED"
value = "true"
},
# Pool size is increased because background jobs are holding
# the connections for a long time
{
name = "DATABASE_POOL_SIZE"
value = "15"
},
], local.shared_application_environment_variables)
application_labels = {