mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
fix(portal): ensure cluster state heals (#9319)
We use `libcluster`, a common Elixir library, for node discovery. It's a very lightweight wrapper around Erlang's standard `Node.connect` functionality. It supports custom cluster formation strategies, and we've implemented one based on fetching the list of nodes from the GCP API, and then attempting to connect to them. Unfortunately, our implementation had two bugs that prevented the cluster from healing in the following two cases: - If we successfully connect to nodes, we tracked an internal state var as having successfully connected to them, forever. If we lost the connection to these nodes (such as during a deploy where the elixir nodes don't come up in time, causing the instance group manager to reap them), then the state would never be updated, and we would never reconnect to the lost nodes. - If we failed to fetch the list of nodes more than 10 times (every 10 seconds, so 100 seconds), then we would fail to schedule the timer to load the nodes again. The first issue is fixed by removing our kept state altogether - this is what libcluster is for. We can simply try to connect to the most recent list of nodes returned from Google's API, and we now log a warning for any nodes that don't connect. The second issue is fixed by always scheduling the timer, forever, regardless of the state of the Google API. Fixes #8660 Fixes #8698
This commit is contained in:
@@ -17,27 +17,18 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
|
||||
"""
|
||||
use GenServer
|
||||
use Cluster.Strategy
|
||||
alias Cluster.Strategy.State
|
||||
require Logger
|
||||
|
||||
defmodule Meta do
|
||||
@type t :: %{
|
||||
nodes: MapSet.t()
|
||||
}
|
||||
|
||||
defstruct nodes: MapSet.new()
|
||||
end
|
||||
|
||||
@default_polling_interval :timer.seconds(10)
|
||||
|
||||
def start_link(args), do: GenServer.start_link(__MODULE__, args)
|
||||
|
||||
@impl true
|
||||
def init([%State{} = state]) do
|
||||
def init([state]) do
|
||||
unless Domain.GoogleCloudPlatform.enabled?(),
|
||||
do: "Google Cloud Platform clustering strategy requires GoogleCloudPlatform to be enabled"
|
||||
|
||||
{:ok, %{state | meta: %Meta{}}, {:continue, :start}}
|
||||
{:ok, state, {:continue, :start}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
@@ -50,7 +41,7 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
|
||||
handle_info(:load, state)
|
||||
end
|
||||
|
||||
def handle_info(:load, %State{} = state) do
|
||||
def handle_info(:load, state) do
|
||||
{:noreply, load(state)}
|
||||
end
|
||||
|
||||
@@ -58,53 +49,17 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp load(%State{topology: topology, meta: %Meta{} = meta} = state) do
|
||||
with {:ok, nodes, state} <- fetch_nodes(state) do
|
||||
new_nodes = MapSet.new(nodes)
|
||||
added_nodes = MapSet.difference(new_nodes, meta.nodes)
|
||||
removed_nodes = MapSet.difference(state.meta.nodes, new_nodes)
|
||||
defp load(state) do
|
||||
Process.send_after(self(), :load, polling_interval(state))
|
||||
|
||||
new_nodes =
|
||||
case Cluster.Strategy.disconnect_nodes(
|
||||
topology,
|
||||
state.disconnect,
|
||||
state.list_nodes,
|
||||
MapSet.to_list(removed_nodes)
|
||||
) do
|
||||
:ok ->
|
||||
new_nodes
|
||||
|
||||
{:error, bad_nodes} ->
|
||||
# Add back the nodes which should have been removed_nodes, but which couldn't be for some reason
|
||||
Enum.reduce(bad_nodes, new_nodes, fn {n, _}, acc ->
|
||||
MapSet.put(acc, n)
|
||||
end)
|
||||
end
|
||||
|
||||
new_nodes =
|
||||
case Cluster.Strategy.connect_nodes(
|
||||
topology,
|
||||
state.connect,
|
||||
state.list_nodes,
|
||||
MapSet.to_list(added_nodes)
|
||||
) do
|
||||
:ok ->
|
||||
new_nodes
|
||||
|
||||
{:error, bad_nodes} ->
|
||||
# Remove the nodes which should have been added_nodes, but couldn't be for some reason
|
||||
Enum.reduce(bad_nodes, new_nodes, fn {n, _}, acc ->
|
||||
MapSet.delete(acc, n)
|
||||
end)
|
||||
end
|
||||
|
||||
Process.send_after(self(), :load, polling_interval(state))
|
||||
|
||||
%State{state | meta: %{state.meta | nodes: new_nodes}}
|
||||
with {:ok, nodes, state} <- fetch_nodes(state),
|
||||
:ok <-
|
||||
Cluster.Strategy.connect_nodes(state.topology, state.connect, state.list_nodes, nodes) do
|
||||
state
|
||||
else
|
||||
{:error, reason} ->
|
||||
Logger.error("Can't fetch list of nodes: #{inspect(reason)}",
|
||||
module: __MODULE__
|
||||
Logger.error("Error fetching nodes or connecting to them",
|
||||
reason: inspect(reason)
|
||||
)
|
||||
|
||||
state
|
||||
@@ -120,8 +75,8 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
|
||||
{:ok, nodes, state}
|
||||
else
|
||||
{:error, %{"error" => %{"code" => 401}} = reason} ->
|
||||
Logger.error("Invalid access token was used: #{inspect(reason)}",
|
||||
module: __MODULE__
|
||||
Logger.error("Invalid access token was used",
|
||||
reason: inspect(reason)
|
||||
)
|
||||
|
||||
if remaining_retry_count == 0 do
|
||||
@@ -133,15 +88,13 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
|
||||
{:error, reason} ->
|
||||
if remaining_retry_count == 0 do
|
||||
Logger.error("Can't fetch list of nodes or access token",
|
||||
reason: inspect(reason),
|
||||
module: __MODULE__
|
||||
reason: inspect(reason)
|
||||
)
|
||||
|
||||
{:error, reason}
|
||||
else
|
||||
Logger.info("Can't fetch list of nodes or access token",
|
||||
reason: inspect(reason),
|
||||
module: __MODULE__
|
||||
reason: inspect(reason)
|
||||
)
|
||||
|
||||
backoff_interval = Keyword.get(state.config, :backoff_interval, 1_000)
|
||||
@@ -189,7 +142,7 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
|
||||
end
|
||||
end
|
||||
|
||||
defp polling_interval(%State{config: config}) do
|
||||
Keyword.get(config, :polling_interval, @default_polling_interval)
|
||||
defp polling_interval(state) do
|
||||
Keyword.get(state.config, :polling_interval, @default_polling_interval)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
defmodule Domain.Cluster.GoogleComputeLabelsStrategyTest do
|
||||
use ExUnit.Case, async: true
|
||||
import Domain.Cluster.GoogleComputeLabelsStrategy
|
||||
alias Domain.Cluster.GoogleComputeLabelsStrategy.Meta
|
||||
alias Cluster.Strategy.State
|
||||
alias Domain.Mocks.GoogleCloudPlatform
|
||||
|
||||
describe "fetch_nodes/1" do
|
||||
@@ -11,8 +9,7 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategyTest do
|
||||
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
|
||||
GoogleCloudPlatform.mock_instances_list_endpoint(bypass)
|
||||
|
||||
state = %State{
|
||||
meta: %Meta{},
|
||||
state = %{
|
||||
config: [
|
||||
project_id: "firezone-staging",
|
||||
cluster_name: "firezone",
|
||||
|
||||
Reference in New Issue
Block a user