Track protocol in activities (#4215)

Closes #4046
Closes #3026
This commit is contained in:
Andrew Dryga
2024-03-19 12:26:12 -06:00
committed by GitHub
parent 74026d8b13
commit 370a45571c
18 changed files with 405 additions and 26 deletions

View File

@@ -341,18 +341,20 @@ defmodule API.Gateway.Channel do
%{
"flow_id" => flow_id,
"destination" => destination,
"connectivity_type" => connectivity_type,
"rx_bytes" => rx_bytes,
"tx_bytes" => tx_bytes
} = metric
%{
flow_id: flow_id,
account_id: socket.assigns.gateway.account_id,
window_started_at: window_started_at,
window_ended_at: window_ended_at,
connectivity_type: String.to_existing_atom(connectivity_type),
destination: destination,
rx_bytes: rx_bytes,
tx_bytes: tx_bytes,
flow_id: flow_id,
account_id: socket.assigns.gateway.account_id
tx_bytes: tx_bytes
}
end)

View File

@@ -820,7 +820,7 @@ defmodule API.Gateway.ChannelTest do
now = DateTime.utc_now() |> DateTime.truncate(:second)
one_minute_ago = DateTime.add(now, -1, :minute)
{:ok, destination} = Domain.Types.IPPort.cast("127.0.0.1:80")
{:ok, destination} = Domain.Types.ProtocolIPPort.cast("tcp://127.0.0.1:80")
attrs = %{
"started_at" => DateTime.to_unix(one_minute_ago),
@@ -829,6 +829,7 @@ defmodule API.Gateway.ChannelTest do
%{
"flow_id" => flow.id,
"destination" => destination,
"connectivity_type" => "direct",
"rx_bytes" => 100,
"tx_bytes" => 200
}

View File

@@ -132,10 +132,19 @@ defmodule Domain.Flows do
def upsert_activities(activities) do
{num, _} = Repo.insert_all(Activity, activities, on_conflict: :nothing)
{:ok, num}
end
def fetch_last_activity_for(%Flow{} = flow, %Auth.Subject{} = subject, opts \\ []) do
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_flows_permission()) do
Activity.Query.all()
|> Activity.Query.by_flow_id(flow.id)
|> Activity.Query.first()
|> Ecto.Query.order_by([activities: activities], desc: activities.window_ended_at)
|> Repo.fetch(Activity.Query, opts)
end
end
def list_flow_activities_for(assoc, ended_after, started_before, subject, opts \\ [])
def list_flow_activities_for(

View File

@@ -5,10 +5,12 @@ defmodule Domain.Flows.Activity do
field :window_started_at, :utc_datetime
field :window_ended_at, :utc_datetime
field :destination, Domain.Types.IPPort
field :destination, Domain.Types.ProtocolIPPort
field :rx_bytes, :integer
field :tx_bytes, :integer
field :connectivity_type, Ecto.Enum, values: [:relayed, :direct]
belongs_to :flow, Domain.Flows.Flow
belongs_to :account, Domain.Accounts.Account
end

View File

@@ -21,6 +21,10 @@ defmodule Domain.Flows.Activity.Query do
where(queryable, [activities: activities], activities.window_ended_at > ^datetime)
end
def first(queryable) do
limit(queryable, 1)
end
# Pagination
@impl Domain.Repo.Query

View File

@@ -1,7 +1,7 @@
defmodule Domain.Types.IPPort do
@behaviour Ecto.Type
defstruct [:type, :address, :port]
defstruct [:address_type, :address, :port]
def type, do: :string
@@ -17,7 +17,7 @@ defmodule Domain.Types.IPPort do
with {:ok, {binary_address, binary_port}} <- parse_binary(binary),
{:ok, address} <- cast_address(binary_address),
{:ok, port} <- cast_port(binary_port) do
{:ok, %__MODULE__{type: type(address), address: address, port: port}}
{:ok, %__MODULE__{address_type: type(address), address: address, port: port}}
else
_error -> {:error, message: "is invalid"}
end
@@ -79,12 +79,12 @@ defmodule Domain.Types.IPPort do
ip |> :inet.ntoa() |> List.to_string()
end
def to_string(%__MODULE__{type: :ipv4, address: ip, port: port}) do
def to_string(%__MODULE__{address_type: :ipv4, address: ip, port: port}) do
ip = ip |> :inet.ntoa() |> List.to_string()
"#{ip}:#{port}"
end
def to_string(%__MODULE__{type: :ipv6, address: ip, port: port}) do
def to_string(%__MODULE__{address_type: :ipv6, address: ip, port: port}) do
ip = ip |> :inet.ntoa() |> List.to_string()
"[#{ip}]:#{port}"
end

View File

@@ -0,0 +1,76 @@
defmodule Domain.Types.ProtocolIPPort do
alias Domain.Types.IPPort
@behaviour Ecto.Type
defstruct [:protocol, :address_type, :address, :port]
def type, do: :string
def embed_as(_), do: :self
def equal?(left, right), do: left == right
def cast(%__MODULE__{} = ip_port), do: ip_port
def cast(binary) when is_binary(binary) do
binary = String.trim(binary)
with [protocol, rest] <- String.split(binary, "://", parts: 2),
{:ok,
%IPPort{
address_type: address_type,
address: address,
port: port
}} <- IPPort.cast(rest) do
{:ok,
%__MODULE__{
protocol: protocol,
address_type: address_type,
address: address,
port: port
}}
else
_error -> {:error, message: "is invalid"}
end
end
def cast(_), do: :error
def protocol_name("1"), do: "icmp"
def protocol_name("6"), do: "tcp"
def protocol_name("2"), do: "udp"
def protocol_name(binary) do
case Integer.parse(binary) do
{integer, ""} -> Kernel.to_string(integer)
_other -> binary
end
end
def dump(%__MODULE__{} = ip) do
{:ok, __MODULE__.to_string(ip)}
end
def dump(_), do: :error
def load(binary) when is_binary(binary) do
cast(binary)
end
def load(%__MODULE__{} = struct) do
{:ok, struct}
end
def load(_), do: :error
def to_string(%__MODULE__{
protocol: protocol,
address_type: address_type,
address: address,
port: port
}) do
ip_port = %IPPort{address_type: address_type, address: address, port: port}
protocol <> "://" <> IPPort.to_string(ip_port)
end
end

View File

@@ -11,3 +11,8 @@ end
defimpl String.Chars, for: Domain.Types.IPPort do
def to_string(%Domain.Types.IPPort{} = ip_port), do: Domain.Types.IPPort.to_string(ip_port)
end
defimpl String.Chars, for: Domain.Types.ProtocolIPPort do
def to_string(%Domain.Types.ProtocolIPPort{} = struct),
do: Domain.Types.ProtocolIPPort.to_string(struct)
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Repo.Migrations.AddFlowActivitiesConnectivityType do
use Ecto.Migration
def change do
# It's safe to delete now gateways don't send those metrics yet,
# and only data that can be inserted is generated by testing
execute("DELETE FROM flow_activities;")
alter table(:flow_activities) do
add(:connectivity_type, :string, null: false)
end
end
end

View File

@@ -869,8 +869,8 @@ started_at =
|> DateTime.truncate(:second)
|> DateTime.add(5, :minute)
{:ok, destination1} = Domain.Types.IPPort.cast("142.250.217.142:443")
{:ok, destination2} = Domain.Types.IPPort.cast("142.250.217.142:80")
{:ok, destination1} = Domain.Types.ProtocolIPPort.cast("tcp://142.250.217.142:443")
{:ok, destination2} = Domain.Types.ProtocolIPPort.cast("udp://142.250.217.142:111")
random_integer = fn ->
:math.pow(10, 10)
@@ -890,6 +890,7 @@ activities =
window_started_at: started_at,
window_ended_at: ended_at,
destination: Enum.random([destination1, destination2]),
connectivity_type: :direct,
rx_bytes: random_integer.(),
tx_bytes: random_integer.(),
flow_id: flow.id,

View File

@@ -429,7 +429,7 @@ defmodule Domain.FlowsTest do
now = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, destination} = Domain.Types.IPPort.cast("127.0.0.1:80")
{:ok, destination} = Domain.Types.ProtocolIPPort.cast("tcp://127.0.0.1:80")
activity = %{
window_started_at: DateTime.add(now, -1, :minute),
@@ -437,6 +437,7 @@ defmodule Domain.FlowsTest do
destination: destination,
rx_bytes: 100,
tx_bytes: 200,
connectivity_type: :direct,
flow_id: flow.id,
account_id: account.id
}
@@ -480,6 +481,69 @@ defmodule Domain.FlowsTest do
end
end
describe "fetch_last_activity_for/3" do
setup %{
account: account,
client: client,
gateway: gateway,
resource: resource,
policy: policy,
subject: subject
} do
flow =
Fixtures.Flows.create_flow(
account: account,
subject: subject,
client: client,
policy: policy,
resource: resource,
gateway: gateway
)
%{flow: flow}
end
test "returns error when flow has no activities", %{subject: subject, flow: flow} do
assert fetch_last_activity_for(flow, subject) == {:error, :not_found}
end
test "returns last activity for a flow", %{subject: subject, flow: flow} do
now = DateTime.utc_now() |> DateTime.truncate(:second)
thirty_minutes_ago = DateTime.add(now, -30, :minute)
five_minutes_ago = DateTime.add(now, -5, :minute)
four_minutes_ago = DateTime.add(now, -4, :minute)
Fixtures.Flows.create_activity(
flow: flow,
window_started_at: thirty_minutes_ago,
window_ended_at: five_minutes_ago
)
activity =
Fixtures.Flows.create_activity(
flow: flow,
window_started_at: five_minutes_ago,
window_ended_at: four_minutes_ago
)
assert {:ok, fetched_activity} = fetch_last_activity_for(flow, subject)
assert fetched_activity.id == activity.id
end
test "returns error when subject has no permission to view flows", %{
flow: flow,
subject: subject
} do
subject = Fixtures.Auth.remove_permissions(subject)
assert fetch_last_activity_for(flow, subject) ==
{:error,
{:unauthorized,
reason: :missing_permissions,
missing_permissions: [Flows.Authorizer.manage_flows_permission()]}}
end
end
describe "list_flow_activities_for/4" do
setup %{
account: account,

View File

@@ -14,10 +14,20 @@ defmodule Domain.Types.IPPortTest do
assert cast("1.1.1.1:0") == {:error, [message: "is invalid"]}
assert cast("1.1.1.1:1") ==
{:ok, %Domain.Types.IPPort{type: :ipv4, address: {1, 1, 1, 1}, port: 1}}
{:ok,
%Domain.Types.IPPort{
address_type: :ipv4,
address: {1, 1, 1, 1},
port: 1
}}
assert cast("1.1.1.1:65535") ==
{:ok, %Domain.Types.IPPort{type: :ipv4, address: {1, 1, 1, 1}, port: 65_535}}
{:ok,
%Domain.Types.IPPort{
address_type: :ipv4,
address: {1, 1, 1, 1},
port: 65_535
}}
assert cast("1.1.1.1:65536") == {:error, [message: "is invalid"]}
end
@@ -28,7 +38,7 @@ defmodule Domain.Types.IPPortTest do
{:ok, ip_port} = cast("1.1.1.1")
assert put_default_port(ip_port, 53) == %Domain.Types.IPPort{
type: :ipv4,
address_type: :ipv4,
address: {1, 1, 1, 1},
port: 53
}
@@ -38,7 +48,7 @@ defmodule Domain.Types.IPPortTest do
{:ok, ip_port} = cast("1.1.1.1:853")
assert put_default_port(ip_port, 53) == %Domain.Types.IPPort{
type: :ipv4,
address_type: :ipv4,
address: {1, 1, 1, 1},
port: 853
}

View File

@@ -91,12 +91,13 @@ defmodule Domain.Fixtures.Flows do
def activity_attrs(attrs \\ %{}) do
now = DateTime.utc_now() |> DateTime.truncate(:second)
unique_ipv4 = :inet.ntoa(unique_ipv4())
{:ok, destination} = Domain.Types.IPPort.cast("#{unique_ipv4}:80")
{:ok, destination} = Domain.Types.ProtocolIPPort.cast("tcp://#{unique_ipv4}:80")
Enum.into(attrs, %{
window_started_at: DateTime.add(now, -1, :minute),
window_ended_at: now,
destination: destination,
connectivity_type: :relayed,
rx_bytes: 100,
tx_bytes: 200
})

View File

@@ -21,9 +21,12 @@ defmodule Web.Flows.DownloadActivities do
defp send_csv_header(conn) do
iodata =
Web.CSV.dump_to_iodata([
~w[window_started_at window_ended_at destination rx_bytes tx_bytes]
])
Web.CSV.dump_to_iodata([~w[
window_started_at window_ended_at
destination
connectivity_type
rx_bytes tx_bytes
]])
{:ok, conn} = chunk(conn, iodata)
conn
@@ -58,6 +61,7 @@ defmodule Web.Flows.DownloadActivities do
to_string(activity.window_started_at),
to_string(activity.window_ended_at),
to_string(activity.destination),
to_string(activity.connectivity_type),
activity.rx_bytes,
activity.tx_bytes
]

View File

@@ -13,10 +13,13 @@ defmodule Web.Flows.Show do
resource: []
]
) do
last_used_connectivity_type = get_last_used_connectivity_type(flow, socket.assigns.subject)
socket =
assign(socket,
page_title: "Flow #{flow.id}",
flow: flow
flow: flow,
last_used_connectivity_type: last_used_connectivity_type
)
{:ok, socket}
@@ -25,6 +28,13 @@ defmodule Web.Flows.Show do
end
end
defp get_last_used_connectivity_type(flow, subject) do
case Flows.fetch_last_activity_for(flow, subject) do
{:ok, activity} -> to_string(activity.connectivity_type)
_other -> "N/A"
end
end
def render(assigns) do
~H"""
<.breadcrumbs account={@account}>
@@ -97,6 +107,12 @@ defmodule Web.Flows.Show do
</.link>
</:value>
</.vertical_table_row>
<.vertical_table_row>
<:label>Connectivity Type</:label>
<:value>
<%= @last_used_connectivity_type %>
</:value>
</.vertical_table_row>
</.vertical_table>
</:content>
</.section>

View File

@@ -6,6 +6,11 @@ defimpl Phoenix.HTML.Safe, for: Domain.Types.IPPort do
def to_iodata(%Domain.Types.IPPort{} = ip_port), do: Domain.Types.IPPort.to_string(ip_port)
end
defimpl Phoenix.HTML.Safe, for: Domain.Types.ProtocolIPPort do
def to_iodata(%Domain.Types.ProtocolIPPort{} = struct),
do: Domain.Types.ProtocolIPPort.to_string(struct)
end
defimpl Phoenix.Param, for: Domain.Accounts.Account do
def to_param(%Domain.Accounts.Account{slug: slug}) when not is_nil(slug), do: slug
def to_param(%Domain.Accounts.Account{id: id}), do: id

View File

@@ -0,0 +1,167 @@
defmodule Web.Live.Flows.ShowTest do
use Web.ConnCase, async: true
setup do
account = Fixtures.Accounts.create_account()
actor = Fixtures.Actors.create_actor(type: :account_admin_user, account: account)
identity = Fixtures.Auth.create_identity(account: account, actor: actor)
subject = Fixtures.Auth.create_subject(account: account, actor: actor, identity: identity)
client = Fixtures.Clients.create_client(account: account, actor: actor, identity: identity)
flow = Fixtures.Flows.create_flow(account: account, client: client)
%{
account: account,
actor: actor,
identity: identity,
subject: subject,
client: client,
flow: flow
}
end
test "redirects to sign in page for unauthorized user", %{
account: account,
flow: flow,
conn: conn
} do
path = ~p"/#{account}/flows/#{flow}"
assert live(conn, path) ==
{:error,
{:redirect,
%{
to: ~p"/#{account}?#{%{redirect_to: path}}",
flash: %{"error" => "You must sign in to access this page."}
}}}
end
test "renders breadcrumbs item", %{
account: account,
flow: flow,
client: client,
identity: identity,
conn: conn
} do
{:ok, _lv, html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/flows/#{flow}")
assert item = Floki.find(html, "[aria-label='Breadcrumb']")
breadcrumbs = String.trim(Floki.text(item))
assert breadcrumbs =~ "Flows"
assert breadcrumbs =~ "#{client.name} flow"
end
test "renders flows details", %{
account: account,
identity: identity,
flow: flow,
conn: conn
} do
flow =
Repo.preload(flow,
policy: [:resource, :actor_group],
client: [],
gateway: [:group],
resource: []
)
activity =
Fixtures.Flows.create_activity(
account: account,
flow: flow,
window_started_at: DateTime.truncate(flow.inserted_at, :second),
window_ended_at: DateTime.truncate(flow.expires_at, :second)
)
{:ok, lv, _html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/flows/#{flow}")
table =
lv
|> element("#flow")
|> render()
|> vertical_table_to_map()
assert table["authorized at"]
assert table["expires at"]
assert table["connectivity type"] =~ to_string(activity.connectivity_type)
assert table["client"] =~ flow.client.name
assert table["client"] =~ to_string(flow.client_remote_ip)
assert table["client"] =~ flow.client_user_agent
assert table["gateway"] =~ flow.gateway.name
assert table["gateway"] =~ to_string(flow.gateway_remote_ip)
assert table["resource"] =~ flow.resource.name
assert table["policy"] =~ flow.policy.resource.name
assert table["policy"] =~ flow.policy.actor_group.name
end
test "allows downloading activities", %{
account: account,
flow: flow,
identity: identity,
conn: conn
} do
activity =
Fixtures.Flows.create_activity(
account: account,
flow: flow,
window_started_at: DateTime.truncate(flow.inserted_at, :second),
window_ended_at: DateTime.truncate(flow.expires_at, :second)
)
{:ok, lv, _html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/flows/#{flow}")
lv
|> element("a", "Export to CSV")
|> render_click()
assert_redirected(lv, ~p"/#{account}/flows/#{flow}/activities.csv")
controller_conn = get(conn, ~p"/#{account}/flows/#{flow}/activities.csv")
assert redirected_to(controller_conn) =~ ~p"/#{account}"
assert flash(controller_conn, :error) == "You must sign in to access this page."
controller_conn =
conn
|> authorize_conn(identity)
|> get(~p"/#{account}/flows/#{flow}/activities.csv")
assert response = response(controller_conn, 200)
assert response
|> String.trim()
|> String.split("\n")
|> Enum.map(&String.split(&1, "\t")) ==
[
[
"window_started_at",
"window_ended_at",
"destination",
"connectivity_type",
"rx_bytes",
"tx_bytes"
],
[
to_string(activity.window_started_at),
to_string(activity.window_ended_at),
to_string(activity.destination),
to_string(activity.connectivity_type),
to_string(activity.rx_bytes),
to_string(activity.tx_bytes)
]
]
end
end

View File

@@ -197,7 +197,7 @@ defmodule Web.Live.Sites.ShowTest do
{:ok, lv, _html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/resources")
|> live(~p"/#{account}/sites/#{group}")
resource_rows =
lv
@@ -208,7 +208,6 @@ defmodule Web.Live.Sites.ShowTest do
Enum.each(resource_rows, fn row ->
assert row["name"] =~ resource.name
assert row["address"] =~ resource.address
assert row["sites"] =~ group.name
assert row["authorized groups"] == "None, create a Policy to grant access."
end)
end
@@ -245,7 +244,7 @@ defmodule Web.Live.Sites.ShowTest do
{:ok, lv, _html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/resources")
|> live(~p"/#{account}/sites/#{group}")
resource_rows =
lv
@@ -267,7 +266,7 @@ defmodule Web.Live.Sites.ShowTest do
{:ok, lv, _html} =
conn
|> authorize_conn(identity)
|> live(~p"/#{account}/resources")
|> live(~p"/#{account}/sites/#{group}")
resource_rows =
lv