diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index 99c00ddbc..9f646e27f 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -666,9 +666,15 @@ defmodule API.Gateway.Channel do # Send regardless of cache state - if the Gateway has no flows for this resource, # it will simply ignore the message. resource = Cache.Cacheable.to_cache(resource) - push(socket, "resource_updated", Views.Resource.render(resource)) - {:noreply, socket} + case Resources.adapt_resource_for_version(resource, socket.assigns.gateway.last_seen_version) do + nil -> + {:noreply, socket} + + adapted_resource -> + push(socket, "resource_updated", Views.Resource.render(adapted_resource)) + {:noreply, socket} + end end defp handle_change(%Change{}, socket), do: {:noreply, socket} diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 2ac7fe831..3bcb85356 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -980,11 +980,63 @@ defmodule API.Gateway.ChannelTest do end test "sends resource_updated when filters change even without resource in cache", %{ - resource: resource, - socket: _socket + resource: resource } do - # Don't create any flows - simulate a gateway that reconnected - # and doesn't have this resource in its cache yet + # The resource is already connected to the gateway via the setup + # No flows exist yet, so the resource isn't in the cache + + old_data = %{ + "id" => resource.id, + "account_id" => resource.account_id, + "address" => resource.address, + "name" => resource.name, + "type" => "dns", + "filters" => [], + "ip_stack" => "dual" + } + + filters = [ + %{"protocol" => "tcp", "ports" => ["443"]}, + %{"protocol" => "udp", "ports" => ["53"]} + ] + + data = Map.put(old_data, "filters", filters) + + # Trigger the resource update via the Changes hook which will broadcast to the channel + Changes.Hooks.Resources.on_update(100, old_data, data) + + # Should still receive the update even though resource isn't in cache + assert_push "resource_updated", payload + + assert payload == %{ + address: resource.address, + id: resource.id, + name: resource.name, + type: :dns, + filters: [ + %{protocol: :tcp, port_range_start: 443, port_range_end: 443}, + %{protocol: :udp, port_range_start: 53, port_range_end: 53} + ] + } + end + + test "handles resource_updated with version adaptation for old gateways", %{ + gateway: gateway, + resource: resource, + gateway_group: gateway_group, + token: token + } do + # Create a new socket with the gateway set to an old version (< 1.2.0) + {:ok, _, _socket} = + API.Gateway.Socket + |> socket("gateway:#{gateway.id}", %{ + token_id: token.id, + gateway: Map.put(gateway, :last_seen_version, "1.1.0"), + gateway_group: gateway_group, + opentelemetry_ctx: OpenTelemetry.Ctx.new(), + opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test") + }) + |> subscribe_and_join(API.Gateway.Channel, "gateway") old_data = %{ "id" => resource.id, @@ -1006,7 +1058,7 @@ defmodule API.Gateway.ChannelTest do # Trigger the resource update Changes.Hooks.Resources.on_update(100, old_data, data) - # Should still receive the update even though resource isn't in cache + # Gateway with version 1.1.0 should receive the adapted resource assert_push "resource_updated", payload assert payload == %{ @@ -1021,6 +1073,95 @@ defmodule API.Gateway.ChannelTest do } end + test "does not send resource_updated when DNS adaptation fails", %{ + socket: socket + } do + # Update the channel process state to use an old gateway version (< 1.2.0) + :sys.replace_state(socket.channel_pid, fn state -> + put_in(state.assigns.gateway.last_seen_version, "1.1.0") + end) + + # Create a DNS resource with an address that can't be adapted + # For pre-1.2.0, addresses with wildcards not at the beginning get dropped + account = Fixtures.Accounts.create_account() + + resource = + Fixtures.Resources.create_resource( + account: account, + type: :dns, + address: "example.*.com" + ) + + old_data = %{ + "id" => resource.id, + "account_id" => resource.account_id, + "address" => "example.*.com", + "name" => resource.name, + "type" => "dns", + "filters" => [], + "ip_stack" => "dual" + } + + # Only change filters to trigger the filter-change handler + data = Map.put(old_data, "filters", [%{"protocol" => "tcp", "ports" => ["443"]}]) + + # Trigger the resource update + Changes.Hooks.Resources.on_update(100, old_data, data) + + # Should not receive any update since the address can't be adapted for version < 1.2.0 + refute_push "resource_updated", _payload + end + + test "adapts DNS resource address for old gateway versions", %{ + socket: socket, + account: account + } do + # Update the channel process state to use an old gateway version (< 1.2.0) + :sys.replace_state(socket.channel_pid, fn state -> + put_in(state.assigns.gateway.last_seen_version, "1.1.0") + end) + + # Create a DNS resource with an address that needs adaptation for old versions + # Use the existing account from setup so the channel receives the update + resource = + Fixtures.Resources.create_resource( + account: account, + type: :dns, + address: "**.example.com", + connections: [%{gateway_group_id: socket.assigns.gateway.group_id}] + ) + + old_data = %{ + "id" => resource.id, + "account_id" => resource.account_id, + "address" => "**.example.com", + "name" => resource.name, + "type" => "dns", + "filters" => [], + "ip_stack" => "dual" + } + + # Only change filters, not address, to trigger the filter-change handler + data = Map.put(old_data, "filters", [%{"protocol" => "tcp", "ports" => ["443"]}]) + + # Trigger the resource update + Changes.Hooks.Resources.on_update(100, old_data, data) + + # Should receive the update with the adapted address (** becomes * for pre-1.2.0) + assert_push "resource_updated", payload + + assert payload == %{ + # ** was converted to * + address: "*.example.com", + id: resource.id, + name: resource.name, + type: :dns, + filters: [ + %{protocol: :tcp, port_range_start: 443, port_range_end: 443} + ] + } + end + test "subscribes for relays presence", %{gateway: gateway, gateway_group: gateway_group} do relay_group = Fixtures.Relays.create_global_group() stamp_secret = Ecto.UUID.generate()