diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index 91a8c0fa5..ece36c7dc 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -221,7 +221,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.with_span "client.reuse_connection", attrs do with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), - {:ok, resource, _flow} <- + {:ok, resource, flow} <- Flows.authorize_flow( socket.assigns.client, gateway, @@ -239,6 +239,7 @@ defmodule API.Client.Channel do %{ client_id: socket.assigns.client.id, resource_id: resource.id, + flow_id: flow.id, authorization_expires_at: socket.assigns.subject.expires_at }, {opentelemetry_ctx, opentelemetry_span_ctx}} ) @@ -273,7 +274,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.with_span "client.request_connection", ctx_attrs do with {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), - {:ok, resource, _flow} <- + {:ok, resource, flow} <- Flows.authorize_flow( socket.assigns.client, gateway, @@ -291,6 +292,7 @@ defmodule API.Client.Channel do %{ client_id: socket.assigns.client.id, resource_id: resource.id, + flow_id: flow.id, authorization_expires_at: socket.assigns.subject.expires_at, client_rtc_session_description: client_rtc_session_description, client_preshared_key: preshared_key diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index b807cd62e..2bbf3c63d 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Channel do use API, :channel alias API.Gateway.Views - alias Domain.{Clients, Resources, Relays, Gateways} + alias Domain.{Clients, Resources, Relays, Gateways, Flows} require Logger require OpenTelemetry.Tracer @@ -83,6 +83,7 @@ defmodule API.Gateway.Channel do %{ client_id: client_id, resource_id: resource_id, + flow_id: flow_id, authorization_expires_at: authorization_expires_at } = attrs @@ -90,6 +91,7 @@ defmodule API.Gateway.Channel do push(socket, "allow_access", %{ client_id: client_id, + flow_id: flow_id, resource: Views.Resource.render(resource), expires_at: DateTime.to_unix(authorization_expires_at, :second) }) @@ -130,6 +132,7 @@ defmodule API.Gateway.Channel do %{ client_id: client_id, resource_id: resource_id, + flow_id: flow_id, authorization_expires_at: authorization_expires_at, client_rtc_session_description: rtc_session_description, client_preshared_key: preshared_key @@ -148,6 +151,7 @@ defmodule API.Gateway.Channel do push(socket, "request_connection", %{ ref: ref, + flow_id: flow_id, actor: Views.Actor.render(client.actor), relays: Views.Relay.render_many(relays, authorization_expires_at), resource: Views.Resource.render(resource), @@ -158,6 +162,7 @@ defmodule API.Gateway.Channel do Logger.debug("Awaiting gateway connection_ready message", client_id: client_id, resource_id: resource_id, + flow_id: flow_id, ref: ref ) @@ -236,21 +241,45 @@ defmodule API.Gateway.Channel do end end - # def handle_in("metrics", params, socket) do - # %{ - # "started_at" => started_at, - # "ended_at" => ended_at, - # "metrics" => [ - # %{ - # "client_id" => client_id, - # "resource_id" => resource_id, - # "rx_bytes" => 0, - # "tx_packets" => 0 - # } - # ] - # } + def handle_in( + "metrics", + %{ + "started_at" => started_at, + "ended_at" => ended_at, + "metrics" => metrics + }, + socket + ) do + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) - # :ok = Gateways.update_metrics(socket.assigns.relay, metrics) - # {:noreply, socket} - # end + OpenTelemetry.Tracer.with_span "gateway.metrics" do + window_started_at = DateTime.from_unix!(started_at, :second) + window_ended_at = DateTime.from_unix!(ended_at, :second) + + activities = + Enum.map(metrics, fn metric -> + %{ + "flow_id" => flow_id, + "destination" => destination, + "rx_bytes" => rx_bytes, + "tx_bytes" => tx_bytes + } = metric + + %{ + window_started_at: window_started_at, + window_ended_at: window_ended_at, + destination: destination, + rx_bytes: rx_bytes, + tx_bytes: tx_bytes, + flow_id: flow_id, + account_id: socket.assigns.gateway.account_id + } + end) + + {:ok, _num} = Flows.upsert_activities(activities) + + {:reply, :ok, socket} + end + end end diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index dfe55e8fc..960480acf 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -72,6 +72,7 @@ defmodule API.Gateway.ChannelTest do } do expires_at = DateTime.utc_now() |> DateTime.add(30, :second) otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")} + flow_id = Ecto.UUID.generate() stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) @@ -82,6 +83,7 @@ defmodule API.Gateway.ChannelTest do %{ client_id: client.id, resource_id: resource.id, + flow_id: flow_id, authorization_expires_at: expires_at }, otel_ctx} ) @@ -102,6 +104,7 @@ defmodule API.Gateway.ChannelTest do ] } + assert payload.flow_id == flow_id assert payload.client_id == client.id assert DateTime.from_unix!(payload.expires_at) == DateTime.truncate(expires_at, :second) end @@ -142,6 +145,7 @@ defmodule API.Gateway.ChannelTest do expires_at = DateTime.utc_now() |> DateTime.add(30, :second) preshared_key = "PSK" rtc_session_description = "RTC_SD" + flow_id = Ecto.UUID.generate() otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")} @@ -154,6 +158,7 @@ defmodule API.Gateway.ChannelTest do %{ client_id: client.id, resource_id: resource.id, + flow_id: flow_id, authorization_expires_at: expires_at, client_rtc_session_description: rtc_session_description, client_preshared_key: preshared_key @@ -163,6 +168,7 @@ defmodule API.Gateway.ChannelTest do assert_push "request_connection", payload assert is_binary(payload.ref) + assert payload.flow_id == flow_id assert payload.actor == %{id: client.actor_id} ipv4_stun_uri = "stun:#{relay.ipv4}:#{relay.port}" @@ -246,6 +252,7 @@ defmodule API.Gateway.ChannelTest do preshared_key = "PSK" gateway_public_key = gateway.public_key rtc_session_description = "RTC_SD" + flow_id = Ecto.UUID.generate() otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")} @@ -259,12 +266,13 @@ defmodule API.Gateway.ChannelTest do client_id: client.id, resource_id: resource.id, authorization_expires_at: expires_at, + flow_id: flow_id, client_rtc_session_description: rtc_session_description, client_preshared_key: preshared_key }, otel_ctx} ) - assert_push "request_connection", %{ref: ref} + assert_push "request_connection", %{ref: ref, flow_id: ^flow_id} push_ref = push(socket, "connection_ready", %{ @@ -317,4 +325,55 @@ defmodule API.Gateway.ChannelTest do assert gateway.id == gateway_id end end + + describe "handle_in/3 metrics" do + test "inserts activities", %{ + account: account, + subject: subject, + client: client, + gateway: gateway, + resource: resource, + socket: socket + } do + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + resource: resource, + gateway: gateway + ) + + now = DateTime.utc_now() |> DateTime.truncate(:second) + one_minute_ago = DateTime.add(now, -1, :minute) + + {:ok, destination} = Domain.Types.IPPort.cast("127.0.0.1:80") + + attrs = + %{ + "started_at" => DateTime.to_unix(one_minute_ago), + "ended_at" => DateTime.to_unix(now), + "metrics" => [ + %{ + "flow_id" => flow.id, + "destination" => destination, + "rx_bytes" => 100, + "tx_bytes" => 200 + } + ] + } + + push_ref = push(socket, "metrics", attrs) + assert_reply push_ref, :ok + + assert upserted_activity = Repo.one(Domain.Flows.Activity) + assert upserted_activity.window_started_at == one_minute_ago + assert upserted_activity.window_ended_at == now + assert upserted_activity.destination == destination + assert upserted_activity.rx_bytes == 100 + assert upserted_activity.tx_bytes == 200 + assert upserted_activity.flow_id == flow.id + assert upserted_activity.account_id == account.id + end + end end diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index d02d1e436..f03b450da 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -1,7 +1,7 @@ defmodule Domain.Flows do - alias Domain.Repo - alias Domain.{Auth, Clients, Gateways, Resources, Policies} - alias Domain.Flows.{Authorizer, Flow} + alias Domain.{Repo, Validator} + alias Domain.{Auth, Accounts, Clients, Gateways, Resources, Policies} + alias Domain.Flows.{Authorizer, Flow, Activity} require Ecto.Query def authorize_flow( @@ -49,35 +49,53 @@ defmodule Domain.Flows do end end + def fetch_flow_by_id(id, %Auth.Subject{} = subject, opts \\ []) do + with :ok <- Auth.ensure_has_permissions(subject, Authorizer.view_flows_permission()), + true <- Validator.valid_uuid?(id) do + {preload, _opts} = Keyword.pop(opts, :preload, []) + + Flow.Query.by_id(id) + |> Authorizer.for_subject(Flow, subject) + |> Repo.fetch() + |> case do + {:ok, resource} -> {:ok, Repo.preload(resource, preload)} + {:error, reason} -> {:error, reason} + end + else + false -> {:error, :not_found} + other -> other + end + end + def list_flows_for(assoc, subject, opts \\ []) def list_flows_for(%Policies.Policy{} = policy, %Auth.Subject{} = subject, opts) do Flow.Query.by_policy_id(policy.id) - |> list(subject, opts) + |> list_flows(subject, opts) end def list_flows_for(%Resources.Resource{} = resource, %Auth.Subject{} = subject, opts) do Flow.Query.by_resource_id(resource.id) - |> list(subject, opts) + |> list_flows(subject, opts) end def list_flows_for(%Clients.Client{} = client, %Auth.Subject{} = subject, opts) do Flow.Query.by_client_id(client.id) - |> list(subject, opts) + |> list_flows(subject, opts) end def list_flows_for(%Gateways.Gateway{} = gateway, %Auth.Subject{} = subject, opts) do Flow.Query.by_gateway_id(gateway.id) - |> list(subject, opts) + |> list_flows(subject, opts) end - defp list(queryable, subject, opts) do + defp list_flows(queryable, subject, opts) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.view_flows_permission()) do {preload, _opts} = Keyword.pop(opts, :preload, []) {:ok, flows} = queryable - |> Authorizer.for_subject(subject) + |> Authorizer.for_subject(Flow, subject) |> Ecto.Query.order_by([flows: flows], desc: flows.inserted_at, desc: flows.id) |> Ecto.Query.limit(50) |> Repo.list() @@ -85,4 +103,42 @@ defmodule Domain.Flows do {:ok, Repo.preload(flows, preload)} end end + + def upsert_activities(activities) do + {num, _} = + Repo.insert_all(Activity, activities, on_conflict: :nothing) + + {:ok, num} + end + + def list_flow_activities_for( + %Flow{} = flow, + ended_after, + started_before, + %Auth.Subject{} = subject + ) do + Activity.Query.by_flow_id(flow.id) + |> list_activities(ended_after, started_before, subject) + end + + def list_flow_activities_for( + %Accounts.Account{} = account, + ended_after, + started_before, + %Auth.Subject{} = subject + ) do + Activity.Query.by_account_id(account.id) + |> list_activities(ended_after, started_before, subject) + end + + defp list_activities(queryable, ended_after, started_before, subject) do + with :ok <- Auth.ensure_has_permissions(subject, Authorizer.view_flows_permission()) do + queryable + |> Activity.Query.by_window_ended_at({:greater_than, ended_after}) + |> Activity.Query.by_window_started_at({:less_than, started_before}) + |> Authorizer.for_subject(Activity, subject) + |> Ecto.Query.order_by([activities: activities], asc: activities.window_started_at) + |> Repo.list() + end + end end diff --git a/elixir/apps/domain/lib/domain/flows/activity.ex b/elixir/apps/domain/lib/domain/flows/activity.ex new file mode 100644 index 000000000..60417cdb4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/flows/activity.ex @@ -0,0 +1,15 @@ +defmodule Domain.Flows.Activity do + use Domain, :schema + + schema "flow_activities" do + field :window_started_at, :utc_datetime + field :window_ended_at, :utc_datetime + + field :destination, Domain.Types.IPPort + field :rx_bytes, :integer + field :tx_bytes, :integer + + belongs_to :flow, Domain.Flows.Flow + belongs_to :account, Domain.Accounts.Account + end +end diff --git a/elixir/apps/domain/lib/domain/flows/activity/query.ex b/elixir/apps/domain/lib/domain/flows/activity/query.ex new file mode 100644 index 000000000..1eb6bbd55 --- /dev/null +++ b/elixir/apps/domain/lib/domain/flows/activity/query.ex @@ -0,0 +1,23 @@ +defmodule Domain.Flows.Activity.Query do + use Domain, :query + + def all do + from(activities in Domain.Flows.Activity, as: :activities) + end + + def by_account_id(queryable \\ all(), account_id) do + where(queryable, [activities: activities], activities.account_id == ^account_id) + end + + def by_flow_id(queryable \\ all(), flow_id) do + where(queryable, [activities: activities], activities.flow_id == ^flow_id) + end + + def by_window_started_at(queryable \\ all(), {:less_than, datetime}) do + where(queryable, [activities: activities], activities.window_started_at < ^datetime) + end + + def by_window_ended_at(queryable \\ all(), {:greater_than, datetime}) do + where(queryable, [activities: activities], activities.window_ended_at > ^datetime) + end +end diff --git a/elixir/apps/domain/lib/domain/flows/authorizer.ex b/elixir/apps/domain/lib/domain/flows/authorizer.ex index 784840e24..090a6eaf5 100644 --- a/elixir/apps/domain/lib/domain/flows/authorizer.ex +++ b/elixir/apps/domain/lib/domain/flows/authorizer.ex @@ -1,6 +1,6 @@ defmodule Domain.Flows.Authorizer do use Domain.Auth.Authorizer - alias Domain.Flows.Flow + alias Domain.Flows.{Flow, Activity} def view_flows_permission, do: build(Flow, :view) def create_flows_permission, do: build(Flow, :create) @@ -23,11 +23,17 @@ defmodule Domain.Flows.Authorizer do [] end - @impl Domain.Auth.Authorizer - def for_subject(queryable, %Subject{} = subject) do + def for_subject(queryable, Flow, %Subject{} = subject) do cond do has_permission?(subject, view_flows_permission()) -> Flow.Query.by_account_id(queryable, subject.account.id) end end + + def for_subject(queryable, Activity, %Subject{} = subject) do + cond do + has_permission?(subject, view_flows_permission()) -> + Activity.Query.by_account_id(queryable, subject.account.id) + end + end end diff --git a/elixir/apps/domain/lib/domain/resources.ex b/elixir/apps/domain/lib/domain/resources.ex index afd68bd02..a7c58d894 100644 --- a/elixir/apps/domain/lib/domain/resources.ex +++ b/elixir/apps/domain/lib/domain/resources.ex @@ -29,11 +29,11 @@ defmodule Domain.Resources do end def fetch_and_authorize_resource_by_id(id, %Auth.Subject{} = subject, opts \\ []) do - {preload, _opts} = Keyword.pop(opts, :preload, []) - with :ok <- Auth.ensure_has_permissions(subject, Authorizer.view_available_resources_permission()), true <- Validator.valid_uuid?(id) do + {preload, _opts} = Keyword.pop(opts, :preload, []) + Resource.Query.by_id(id) |> Resource.Query.by_account_id(subject.account.id) |> Resource.Query.by_authorized_actor_id(subject.actor.id) diff --git a/elixir/apps/domain/lib/domain/types/ip_port.ex b/elixir/apps/domain/lib/domain/types/ip_port.ex index e6e932167..7b5e78581 100644 --- a/elixir/apps/domain/lib/domain/types/ip_port.ex +++ b/elixir/apps/domain/lib/domain/types/ip_port.ex @@ -65,8 +65,12 @@ defmodule Domain.Types.IPPort do def dump(_), do: :error - def load(%__MODULE__{} = ip) do - {:ok, ip} + def load(binary) when is_binary(binary) do + cast(binary) + end + + def load(%__MODULE__{} = struct) do + {:ok, struct} end def load(_), do: :error diff --git a/elixir/apps/domain/priv/repo/migrations/20230927155748_add_flow_activities.exs b/elixir/apps/domain/priv/repo/migrations/20230927155748_add_flow_activities.exs new file mode 100644 index 000000000..c0f1e3728 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20230927155748_add_flow_activities.exs @@ -0,0 +1,36 @@ +defmodule Domain.Repo.Migrations.AddFlowActivities do + use Ecto.Migration + + @assoc_opts [type: :binary_id, on_delete: :delete_all] + + def change do + create table(:flow_activities, primary_key: false) do + add(:id, :uuid, primary_key: true) + + add(:window_started_at, :utc_datetime_usec, null: false) + add(:window_ended_at, :utc_datetime_usec, null: false) + + add(:destination, :string, null: false) + add(:rx_bytes, :bigint, null: false) + add(:tx_bytes, :bigint, null: false) + + add(:flow_id, references(:flows, @assoc_opts), null: false) + add(:account_id, references(:accounts, @assoc_opts), null: false) + end + + execute(""" + CREATE UNIQUE INDEX flow_activities_account_id_flow_id_window_destination_index ON flow_activities + USING BTREE (account_id, flow_id, window_started_at, window_ended_at, destination); + """) + + execute(""" + CREATE INDEX flow_activities_account_id_flow_id_window_index ON flow_activities + USING BTREE (account_id, flow_id, window_started_at ASC); + """) + + execute(""" + CREATE INDEX flow_activities_account_id_window_index ON flow_activities + USING BTREE (account_id, window_started_at ASC); + """) + end +end diff --git a/elixir/apps/domain/priv/repo/seeds.exs b/elixir/apps/domain/priv/repo/seeds.exs index e1f7d09a7..445998125 100644 --- a/elixir/apps/domain/priv/repo/seeds.exs +++ b/elixir/apps/domain/priv/repo/seeds.exs @@ -457,9 +457,45 @@ IO.puts("Created client tokens:") IO.puts(" #{unprivileged_actor_email} token: #{unprivileged_subject_client_token}") IO.puts("") -Flows.authorize_flow( - user_iphone, - gateway1, - cidr_resource.id, - unprivileged_subject -) +{:ok, _resource, flow} = + Flows.authorize_flow( + user_iphone, + gateway1, + cidr_resource.id, + unprivileged_subject + ) + +started_at = + DateTime.utc_now() + |> DateTime.truncate(:second) + |> DateTime.add(5, :minute) + +{:ok, destination1} = Domain.Types.IPPort.cast("142.250.217.142:443") +{:ok, destination2} = Domain.Types.IPPort.cast("142.250.217.142:80") + +random_integer = fn -> + :math.pow(10, 10) + |> round() + |> :rand.uniform() + |> floor() + |> Kernel.-(1) +end + +activities = + for i <- 1..200 do + offset = i * 15 + started_at = DateTime.add(started_at, offset, :minute) + ended_at = DateTime.add(started_at, 15, :minute) + + %{ + window_started_at: started_at, + window_ended_at: ended_at, + destination: Enum.random([destination1, destination2]), + rx_bytes: random_integer.(), + tx_bytes: random_integer.(), + flow_id: flow.id, + account_id: account.id + } + end + +{:ok, 200} = Flows.upsert_activities(activities) diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index 807584261..77159268a 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -1222,7 +1222,7 @@ defmodule Domain.ActorsTest do {:ok, actor} = fetch_actor_by_id(actor.id, subject, preload: :identities) - assert Ecto.assoc_loaded?(actor.identities) == true + assert Ecto.assoc_loaded?(actor.identities) end end @@ -1317,7 +1317,7 @@ defmodule Domain.ActorsTest do {:ok, actors} = list_actors(subject, preload: :identities) assert length(actors) == 2 - assert Enum.all?(actors, fn a -> Ecto.assoc_loaded?(a.identities) end) == true + assert Enum.all?(actors, fn a -> Ecto.assoc_loaded?(a.identities) end) end end diff --git a/elixir/apps/domain/test/domain/flows_test.exs b/elixir/apps/domain/test/domain/flows_test.exs index 867a208ea..95c2d722a 100644 --- a/elixir/apps/domain/test/domain/flows_test.exs +++ b/elixir/apps/domain/test/domain/flows_test.exs @@ -2,6 +2,7 @@ defmodule Domain.FlowsTest do use Domain.DataCase, async: true import Domain.Flows alias Domain.Flows + alias Domain.Flows.Authorizer setup do account = Fixtures.Accounts.create_account() @@ -181,11 +182,95 @@ defmodule Domain.FlowsTest do assert {:ok, resource, _flow} = authorize_flow(client, gateway, resource.id, subject, preload: :connections) - assert Ecto.assoc_loaded?(resource.connections) == true + assert Ecto.assoc_loaded?(resource.connections) + assert Ecto.assoc_loaded?(resource.connections) + assert Ecto.assoc_loaded?(resource.connections) + assert Ecto.assoc_loaded?(resource.connections) assert length(resource.connections) == 1 end end + describe "fetch_flow_by_id/2" do + test "returns error when flow does not exist", %{subject: subject} do + assert fetch_flow_by_id(Ecto.UUID.generate(), subject) == {:error, :not_found} + end + + test "returns error when UUID is invalid", %{subject: subject} do + assert fetch_flow_by_id("foo", subject) == {:error, :not_found} + end + + test "returns flow", %{ + account: account, + client: client, + gateway: gateway, + resource: resource, + policy: policy, + subject: subject + } do + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + policy: policy, + resource: resource, + gateway: gateway + ) + + assert {:ok, fetched_flow} = fetch_flow_by_id(flow.id, subject) + assert fetched_flow.id == flow.id + end + + test "does not return flows in other accounts", %{subject: subject} do + flow = Fixtures.Flows.create_flow() + assert fetch_flow_by_id(flow.id, subject) == {:error, :not_found} + end + + test "returns error when subject has no permission to view flows", %{subject: subject} do + subject = Fixtures.Auth.remove_permissions(subject) + + assert fetch_flow_by_id(Ecto.UUID.generate(), subject) == + {:error, + {:unauthorized, [missing_permissions: [Authorizer.view_flows_permission()]]}} + end + + test "associations are preloaded when opts given", %{ + account: account, + client: client, + gateway: gateway, + resource: resource, + policy: policy, + subject: subject + } do + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + policy: policy, + resource: resource, + gateway: gateway + ) + + assert {:ok, flow} = + fetch_flow_by_id(flow.id, subject, + preload: [ + :policy, + :client, + :gateway, + :resource, + :account + ] + ) + + assert Ecto.assoc_loaded?(flow.policy) + assert Ecto.assoc_loaded?(flow.client) + assert Ecto.assoc_loaded?(flow.gateway) + assert Ecto.assoc_loaded?(flow.resource) + assert Ecto.assoc_loaded?(flow.account) + end + end + describe "list_flows_for/2" do test "returns empty list when there are no flows", %{ client: client, @@ -215,7 +300,7 @@ defmodule Domain.FlowsTest do assert list_flows_for(gateway, subject) == {:ok, []} end - test "returns all authorized resources for account user subject", %{ + test "returns all authorized flows", %{ account: account, client: client, gateway: gateway, @@ -258,4 +343,232 @@ defmodule Domain.FlowsTest do assert list_flows_for(gateway, subject) == expected_error end end + + describe "upsert_activities/1" do + test "inserts new activities", %{ + account: account, + client: client, + gateway: gateway, + resource: resource, + policy: policy, + subject: subject + } do + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + policy: policy, + resource: resource, + gateway: gateway + ) + + now = DateTime.utc_now() |> DateTime.truncate(:second) + + {:ok, destination} = Domain.Types.IPPort.cast("127.0.0.1:80") + + activity = + %{ + window_started_at: DateTime.add(now, -1, :minute), + window_ended_at: now, + destination: destination, + rx_bytes: 100, + tx_bytes: 200, + flow_id: flow.id, + account_id: account.id + } + + assert upsert_activities([activity]) == {:ok, 1} + + assert upserted_activity = Repo.one(Flows.Activity) + assert upserted_activity.window_started_at == activity.window_started_at + assert upserted_activity.window_ended_at == activity.window_ended_at + assert upserted_activity.destination == destination + assert upserted_activity.rx_bytes == activity.rx_bytes + assert upserted_activity.tx_bytes == activity.tx_bytes + assert upserted_activity.flow_id == flow.id + assert upserted_activity.account_id == account.id + end + + test "ignores upsert conflicts", %{ + account: account, + client: client, + gateway: gateway, + resource: resource, + policy: policy, + subject: subject + } do + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + policy: policy, + resource: resource, + gateway: gateway + ) + + activity = Fixtures.Flows.activity_attrs(flow_id: flow.id, account_id: account.id) + + assert upsert_activities([activity]) == {:ok, 1} + assert upsert_activities([activity]) == {:ok, 0} + + assert Repo.one(Flows.Activity) + end + end + + describe "list_flow_activities_for/4" do + setup %{ + account: account, + client: client, + gateway: gateway, + resource: resource, + policy: policy, + subject: subject + } do + flow = + Fixtures.Flows.create_flow( + account: account, + subject: subject, + client: client, + policy: policy, + resource: resource, + gateway: gateway + ) + + %{flow: flow} + end + + test "returns empty list when there are no flow activities", %{ + account: account, + flow: flow, + subject: subject + } do + now = DateTime.utc_now() + ended_after = DateTime.add(now, -30, :minute) + started_before = DateTime.add(now, 30, :minute) + + assert list_flow_activities_for(account, ended_after, started_before, subject) == {:ok, []} + assert list_flow_activities_for(flow, ended_after, started_before, subject) == {:ok, []} + end + + test "does not list flow activities from other accounts", %{ + account: account, + subject: subject + } do + flow = Fixtures.Flows.create_flow() + Fixtures.Flows.create_activity(flow: flow) + + now = DateTime.utc_now() + ended_after = DateTime.add(now, -30, :minute) + started_before = DateTime.add(now, 30, :minute) + + assert list_flow_activities_for(account, ended_after, started_before, subject) == {:ok, []} + assert list_flow_activities_for(flow, ended_after, started_before, subject) == {:ok, []} + end + + test "returns ordered by window start time flow activities within a time window", %{ + account: account, + flow: flow, + subject: subject + } do + now = DateTime.utc_now() |> DateTime.truncate(:second) + thirty_minutes_ago = DateTime.add(now, -30, :minute) + five_minutes_ago = DateTime.add(now, -5, :minute) + four_minutes_ago = DateTime.add(now, -4, :minute) + three_minutes_ago = DateTime.add(now, -4, :minute) + thirty_minutes_in_future = DateTime.add(now, 30, :minute) + sixty_minutes_in_future = DateTime.add(now, 60, :minute) + + activity1 = + Fixtures.Flows.create_activity( + flow: flow, + window_started_at: four_minutes_ago, + window_ended_at: three_minutes_ago + ) + + assert list_flow_activities_for( + account, + thirty_minutes_in_future, + sixty_minutes_in_future, + subject + ) == {:ok, []} + + assert list_flow_activities_for( + flow, + thirty_minutes_in_future, + sixty_minutes_in_future, + subject + ) == {:ok, []} + + assert list_flow_activities_for( + account, + thirty_minutes_ago, + five_minutes_ago, + subject + ) == {:ok, []} + + assert list_flow_activities_for( + flow, + thirty_minutes_ago, + five_minutes_ago, + subject + ) == {:ok, []} + + assert list_flow_activities_for( + account, + five_minutes_ago, + now, + subject + ) == {:ok, [activity1]} + + assert list_flow_activities_for( + flow, + five_minutes_ago, + now, + subject + ) == {:ok, [activity1]} + + activity2 = + Fixtures.Flows.create_activity( + flow: flow, + window_started_at: five_minutes_ago, + window_ended_at: four_minutes_ago + ) + + assert list_flow_activities_for( + account, + thirty_minutes_ago, + now, + subject + ) == {:ok, [activity2, activity1]} + + assert list_flow_activities_for( + flow, + thirty_minutes_ago, + now, + subject + ) == {:ok, [activity2, activity1]} + end + + test "returns error when subject has no permission to view flows", %{ + account: account, + flow: flow, + subject: subject + } do + now = DateTime.utc_now() + ended_after = DateTime.add(now, -30, :minute) + started_before = DateTime.add(now, 30, :minute) + + subject = Fixtures.Auth.remove_permissions(subject) + + assert list_flow_activities_for(account, ended_after, started_before, subject) == + {:error, + {:unauthorized, [missing_permissions: [Flows.Authorizer.view_flows_permission()]]}} + + assert list_flow_activities_for(flow, ended_after, started_before, subject) == + {:error, + {:unauthorized, [missing_permissions: [Flows.Authorizer.view_flows_permission()]]}} + end + end end diff --git a/elixir/apps/domain/test/domain/gateways_test.exs b/elixir/apps/domain/test/domain/gateways_test.exs index 7fee3425c..743ff6b7b 100644 --- a/elixir/apps/domain/test/domain/gateways_test.exs +++ b/elixir/apps/domain/test/domain/gateways_test.exs @@ -418,8 +418,8 @@ defmodule Domain.GatewaysTest do gateway = Fixtures.Gateways.create_gateway(account: account) {:ok, gateway} = fetch_gateway_by_id(gateway.id, subject, preload: [:group, :account]) - assert Ecto.assoc_loaded?(gateway.group) == true - assert Ecto.assoc_loaded?(gateway.account) == true + assert Ecto.assoc_loaded?(gateway.group) + assert Ecto.assoc_loaded?(gateway.account) end end diff --git a/elixir/apps/domain/test/domain/resources_test.exs b/elixir/apps/domain/test/domain/resources_test.exs index f83517615..57474501d 100644 --- a/elixir/apps/domain/test/domain/resources_test.exs +++ b/elixir/apps/domain/test/domain/resources_test.exs @@ -99,7 +99,7 @@ defmodule Domain.ResourcesTest do ) assert {:ok, resource} = fetch_resource_by_id(resource.id, subject, preload: :connections) - assert Ecto.assoc_loaded?(resource.connections) == true + assert Ecto.assoc_loaded?(resource.connections) assert length(resource.connections) == 1 end end @@ -222,7 +222,7 @@ defmodule Domain.ResourcesTest do assert {:ok, resource} = fetch_and_authorize_resource_by_id(resource.id, subject, preload: :connections) - assert Ecto.assoc_loaded?(resource.connections) == true + assert Ecto.assoc_loaded?(resource.connections) assert length(resource.connections) == 1 end end diff --git a/elixir/apps/domain/test/support/fixtures/flows.ex b/elixir/apps/domain/test/support/fixtures/flows.ex index c9860c9ee..22642313b 100644 --- a/elixir/apps/domain/test/support/fixtures/flows.ex +++ b/elixir/apps/domain/test/support/fixtures/flows.ex @@ -84,4 +84,46 @@ defmodule Domain.Fixtures.Flows do }) |> Repo.insert!() end + + def activity_attrs(attrs \\ %{}) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + unique_ipv4 = :inet.ntoa(unique_ipv4()) + {:ok, destination} = Domain.Types.IPPort.cast("#{unique_ipv4}:80") + + Enum.into(attrs, %{ + window_started_at: DateTime.add(now, -1, :minute), + window_ended_at: now, + destination: destination, + rx_bytes: 100, + tx_bytes: 200 + }) + end + + def create_activity(attrs) do + attrs = activity_attrs(attrs) + + {account, attrs} = + pop_assoc_fixture(attrs, :account, fn assoc_attrs -> + if relation = attrs[:flow] do + Repo.get!(Domain.Accounts.Account, relation.account_id) + else + Fixtures.Accounts.create_account(assoc_attrs) + end + end) + + {flow, attrs} = + pop_assoc_fixture(attrs, :flow, fn assoc_attrs -> + assoc_attrs + |> Enum.into(%{account: account}) + |> create_flow() + end) + + attrs = + attrs + |> Map.put(:flow_id, flow.id) + |> Map.put(:account_id, account.id) + + struct(Flows.Activity, attrs) + |> Repo.insert!() + end end diff --git a/elixir/apps/web/assets/css/scrollbar.css b/elixir/apps/web/assets/css/scrollbar.css index 420c624a2..8a7e7d95f 100644 --- a/elixir/apps/web/assets/css/scrollbar.css +++ b/elixir/apps/web/assets/css/scrollbar.css @@ -1,30 +1,28 @@ @layer utilities { - @variants responsive { - .no-scrollbar::-webkit-scrollbar { - display: block; - height: 0px; - background-color: initial; - border-radius: 10px; - transition: all 2s linear; - } + .no-scrollbar::-webkit-scrollbar { + display: block; + height: 0px; + background-color: initial; + border-radius: 10px; + transition: all 2s linear; + } - .no-scrollbar:hover::-webkit-scrollbar { - height: .5rem; - } + .no-scrollbar:hover::-webkit-scrollbar { + height: .5rem; + } - .no-scrollbar { - -ms-overflow-style: none; - scrollbar-width: none; - } + .no-scrollbar { + -ms-overflow-style: none; + scrollbar-width: none; + } - .no-scrollbar::-webkit-scrollbar-thumb { - background-color: rgb(228 228 231/var(--tw-bg-opacity)); - border-radius: 10px; - } + .no-scrollbar::-webkit-scrollbar-thumb { + background-color: rgb(228 228 231/var(--tw-bg-opacity)); + border-radius: 10px; + } - .no-scrollbar::-webkit-scrollbar-track { - background-color: rgb(249 250 251); - border-radius: 5px; - } - } + .no-scrollbar::-webkit-scrollbar-track { + background-color: rgb(249 250 251); + border-radius: 5px; + } } diff --git a/elixir/apps/web/assets/js/hooks.js b/elixir/apps/web/assets/js/hooks.js index 85678f084..e0694d1e4 100644 --- a/elixir/apps/web/assets/js/hooks.js +++ b/elixir/apps/web/assets/js/hooks.js @@ -22,6 +22,7 @@ Hooks.Copy = { }, } + // Update status indicator when sidebar is mounted or updated let statusIndicatorClassNames = { none: "bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-300", diff --git a/elixir/apps/web/assets/package.json b/elixir/apps/web/assets/package.json index 43c50b1d1..4d6aec503 100644 --- a/elixir/apps/web/assets/package.json +++ b/elixir/apps/web/assets/package.json @@ -1,6 +1,6 @@ { "dependencies": { "@fontsource/source-sans-pro": "^4.5.11", - "flowbite": "^1.6.5" + "flowbite": "^1.8.1" } } diff --git a/elixir/apps/web/assets/pnpm-lock.yaml b/elixir/apps/web/assets/pnpm-lock.yaml index 6ecb634de..6f8403895 100644 --- a/elixir/apps/web/assets/pnpm-lock.yaml +++ b/elixir/apps/web/assets/pnpm-lock.yaml @@ -9,8 +9,8 @@ dependencies: specifier: ^4.5.11 version: 4.5.11 flowbite: - specifier: ^1.6.5 - version: 1.6.5 + specifier: ^1.8.1 + version: 1.8.1 packages: @@ -22,8 +22,12 @@ packages: resolution: {integrity: sha512-P1st0aksCrn9sGZhp8GMYwBnQsbvAWsZAX44oXNNvLHGqAOcoVxmjZiohstwQ7SqKnbR47akdNi+uleWD8+g6A==} dev: false - /flowbite@1.6.5: - resolution: {integrity: sha512-eI4h3pIRI9d7grlYq14r0A01KUtw7189sPLLx/O2i7JyPEWpbleScfYuEc48XTeNjk1xxm/JHgZkD9kjyOWAlA==} + /@yr/monotone-cubic-spline@1.0.3: + resolution: {integrity: sha512-FQXkOta0XBSUPHndIKON2Y9JeQz5ZeMqLYZVVK93FliNBFm7LNMIZmY6FrMEB9XPcDbE2bekMbZD6kzDkxwYjA==} + dev: false + + /flowbite@1.8.1: + resolution: {integrity: sha512-lXTcO8a6dRTPFpINyOLcATCN/pK1Of/jY4PryklPllAiqH64tSDUsOdQpar3TO59ZXWwugm2e92oaqwH6X90Xg==} dependencies: '@popperjs/core': 2.11.8 mini-svg-data-uri: 1.4.4 @@ -33,3 +37,57 @@ packages: resolution: {integrity: sha512-r9deDe9p5FJUPZAk3A59wGH7Ii9YrjjWw0jmw/liSbHl2CHiyXj6FcDXDu2K3TjVAXqiJdaw3xxwlZZr9E6nHg==} hasBin: true dev: false + + /svg.draggable.js@2.2.2: + resolution: {integrity: sha512-JzNHBc2fLQMzYCZ90KZHN2ohXL0BQJGQimK1kGk6AvSeibuKcIdDX9Kr0dT9+UJ5O8nYA0RB839Lhvk4CY4MZw==} + engines: {node: '>= 0.8.0'} + dependencies: + svg.js: 2.7.1 + dev: false + + /svg.easing.js@2.0.0: + resolution: {integrity: sha512-//ctPdJMGy22YoYGV+3HEfHbm6/69LJUTAqI2/5qBvaNHZ9uUFVC82B0Pl299HzgH13rKrBgi4+XyXXyVWWthA==} + engines: {node: '>= 0.8.0'} + dependencies: + svg.js: 2.7.1 + dev: false + + /svg.filter.js@2.0.2: + resolution: {integrity: sha512-xkGBwU+dKBzqg5PtilaTb0EYPqPfJ9Q6saVldX+5vCRy31P6TlRCP3U9NxH3HEufkKkpNgdTLBJnmhDHeTqAkw==} + engines: {node: '>= 0.8.0'} + dependencies: + svg.js: 2.7.1 + dev: false + + /svg.js@2.7.1: + resolution: {integrity: sha512-ycbxpizEQktk3FYvn/8BH+6/EuWXg7ZpQREJvgacqn46gIddG24tNNe4Son6omdXCnSOaApnpZw6MPCBA1dODA==} + dev: false + + /svg.pathmorphing.js@0.1.3: + resolution: {integrity: sha512-49HWI9X4XQR/JG1qXkSDV8xViuTLIWm/B/7YuQELV5KMOPtXjiwH4XPJvr/ghEDibmLQ9Oc22dpWpG0vUDDNww==} + engines: {node: '>= 0.8.0'} + dependencies: + svg.js: 2.7.1 + dev: false + + /svg.resize.js@1.4.3: + resolution: {integrity: sha512-9k5sXJuPKp+mVzXNvxz7U0uC9oVMQrrf7cFsETznzUDDm0x8+77dtZkWdMfRlmbkEEYvUn9btKuZ3n41oNA+uw==} + engines: {node: '>= 0.8.0'} + dependencies: + svg.js: 2.7.1 + svg.select.js: 2.1.2 + dev: false + + /svg.select.js@2.1.2: + resolution: {integrity: sha512-tH6ABEyJsAOVAhwcCjF8mw4crjXSI1aa7j2VQR8ZuJ37H2MBUbyeqYr5nEO7sSN3cy9AR9DUwNg0t/962HlDbQ==} + engines: {node: '>= 0.8.0'} + dependencies: + svg.js: 2.7.1 + dev: false + + /svg.select.js@3.0.1: + resolution: {integrity: sha512-h5IS/hKkuVCbKSieR9uQCj9w+zLHoPh+ce19bBYyqF53g6mnPB8sAtIbe1s9dh2S2fCmYX2xel1Ln3PJBbK4kw==} + engines: {node: '>= 0.8.0'} + dependencies: + svg.js: 2.7.1 + dev: false diff --git a/elixir/apps/web/assets/tailwind.config.js b/elixir/apps/web/assets/tailwind.config.js index 58bb37e23..5db3f930e 100644 --- a/elixir/apps/web/assets/tailwind.config.js +++ b/elixir/apps/web/assets/tailwind.config.js @@ -6,7 +6,6 @@ const fs = require("fs") const path = require("path") const defaultTheme = require("tailwindcss/defaultTheme") - const firezoneColors = { // See our brand palette in Figma. // These have been reversed to match Tailwind's default order. @@ -70,7 +69,7 @@ module.exports = { extend: { colors: { brand: "#FD4F00", - primary: firezoneColors["heat-wave"], + primary: firezoneColors["heat-wave"], accent: firezoneColors["electric-violet"], neutral: firezoneColors["night-rider"] //primary: { @@ -89,7 +88,9 @@ module.exports = { }, }, plugins: [ - require("flowbite/plugin"), + require('flowbite/plugin')({ + charts: true, + }), require("@tailwindcss/forms"), plugin(({ addVariant }) => addVariant("phx-no-feedback", [".phx-no-feedback&", ".phx-no-feedback &"])), plugin(({ addVariant }) => addVariant("phx-click-loading", [".phx-click-loading&", ".phx-click-loading &"])), diff --git a/elixir/apps/web/lib/web/components/navigation_components.ex b/elixir/apps/web/lib/web/components/navigation_components.ex index 33c790ac5..a6fc52bb1 100644 --- a/elixir/apps/web/lib/web/components/navigation_components.ex +++ b/elixir/apps/web/lib/web/components/navigation_components.ex @@ -264,7 +264,7 @@ defmodule Web.NavigationComponents do Renders a single breadcrumb entry. should be wrapped in <.breadcrumbs> component. """ slot :inner_block, required: true, doc: "The label for the breadcrumb entry." - attr :path, :string, required: true, doc: "The path for the breadcrumb entry." + attr :path, :string, default: nil, doc: "The path for the breadcrumb entry." def breadcrumb(assigns) do ~H""" @@ -272,11 +272,19 @@ defmodule Web.NavigationComponents do
<%= @flow.client.name %>
+
+
+ <:action
+ navigate={~p"/#{@account}/flows/#{@flow}/activities.csv"}
+ icon="hero-arrow-down-on-square"
+ >
+ Export to CSV
+
+
+ <:content flash={@flash}>
+ <.vertical_table id="flow">
+ <.vertical_table_row>
+ <:label>Authorized At
+ <:value>
+ <.relative_datetime datetime={@flow.inserted_at} />
+
+
+ <.vertical_table_row>
+ <:label>Expires At
+ <:value>
+ <.relative_datetime datetime={@flow.expires_at} />
+
+
+ <.vertical_table_row>
+ <:label>Policy
+ <:value>
+ <.link
+ navigate={~p"/#{@account}/policies/#{@flow.policy_id}"}
+ class="font-medium text-blue-600 dark:text-blue-500 hover:underline"
+ >
+ <.policy_name policy={@flow.policy} />
+
+
+
+ <.vertical_table_row>
+ <:label>Client
+ <:value>
+ <.link
+ navigate={~p"/#{@account}/clients/#{@flow.client_id}"}
+ class="font-medium text-blue-600 dark:text-blue-500 hover:underline"
+ >
+ <%= @flow.client.name %>
+
+