Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: on new keys, disconnect user socket #1292

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/realtime/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ defmodule Realtime.Api do
data: %{external_id: external_id}
})
when is_map_key(changes, :jwt_jwks) or is_map_key(changes, :jwt_secret) do
Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations:" <> external_id, :disconnect)
IO.inspect("trigger_disconnect")
RealtimeWeb.Endpoint.broadcast("user_socket:#{external_id}", "disconnect", %{})
end

defp maybe_trigger_disconnect(_), do: nil
Expand Down Expand Up @@ -198,7 +199,8 @@ defmodule Realtime.Api do
{value, settings} = Map.pop(extension.settings, from)
new_settings = Map.put(settings, to, value)

Ecto.Changeset.cast(extension, %{settings: new_settings}, [:settings])
extension
|> Ecto.Changeset.cast(%{settings: new_settings}, [:settings])
|> Repo.update!()
end
end
Expand Down
33 changes: 21 additions & 12 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
Realtime.UsersCounter.add(transport_pid, tenant_id)
RealtimeWeb.Endpoint.subscribe(tenant_topic)
Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id)
Process.monitor(transport_pid)

is_new_api = new_api?(params)
pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic)
Expand Down Expand Up @@ -195,8 +196,9 @@
end
end

@impl true
def handle_info(
_any,
any,
%{
assigns: %{
rate_counter: %{avg: avg},
Expand All @@ -205,32 +207,28 @@
} = socket
)
when avg > max do
IO.inspect(any)

Check warning on line 210 in lib/realtime_web/channels/realtime_channel.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.
message = "Too many messages per second"

shutdown_response(socket, message)
end

@impl true

def handle_info(:sync_presence = msg, socket) do
PresenceHandler.track(msg, socket)
end

@impl true
def handle_info(%{event: "postgres_cdc_rls_down"}, socket) do
pg_sub_ref = postgres_subscribe()

{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end

@impl true
def handle_info(%{event: "postgres_cdc_down"}, socket) do
pg_sub_ref = postgres_subscribe()

{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end

@impl true
def handle_info(
%{event: type, payload: payload} = msg,
%{assigns: %{policies: policies}} = socket
Expand Down Expand Up @@ -261,7 +259,6 @@
{:noreply, socket}
end

@impl true
def handle_info(:postgres_subscribe, %{assigns: %{channel_name: channel_name}} = socket) do
%{
assigns: %{
Expand Down Expand Up @@ -308,7 +305,6 @@
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end

@impl true
def handle_info(:confirm_token, %{assigns: %{pg_change_params: pg_change_params}} = socket) do
case confirm_token(socket) do
{:ok, claims, confirm_token_ref, _, _} ->
Expand All @@ -326,13 +322,19 @@
end
end

def handle_info(:disconnect, %{assigns: %{channel_name: channel_name}} = socket) do
def handle_info(%{event: "phx_leave"}, %{assigns: %{channel_name: channel_name}} = socket) do
Logger.info("Received operational call to disconnect channel")
push_system_message("system", socket, "ok", "Server requested disconnect", channel_name)
{:stop, :shutdown, socket}
{:stop, {:shutdown, :left}, socket}
end

def handle_info({:shutdown, :closed}, %{assigns: %{channel_name: channel_name}} = socket) do
push_system_message("system", socket, "ok", "Server requested disconnect", channel_name)
{:stop, {:shutdown, :closed}, socket}
end

def handle_info(msg, socket) do
IO.inspect(msg)

Check warning on line 337 in lib/realtime_web/channels/realtime_channel.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.
log_error("UnhandledSystemMessage", msg)
{:noreply, socket}
end
Expand Down Expand Up @@ -432,7 +434,6 @@

def handle_in(type, payload, socket) do
socket = count(socket)

# Log info here so that bad messages from clients won't flood Logflare
# Can subscribe to a Channel with `log_level` `info` to see these messages
message = "Unexpected message from client of type `#{type}` with payload: #{inspect(payload)}"
Expand All @@ -442,8 +443,16 @@
end

@impl true
def terminate(reason, _state) do
def terminate({:shutdown, :closed}, %{assigns: %{channel_name: channel_name}} = socket) do
IO.inspect("Channel terminated with reason: shutdown")

Check warning on line 447 in lib/realtime_web/channels/realtime_channel.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.
push_system_message("system", socket, "ok", "Server requested disconnect", channel_name)
:ok
end

def terminate(reason, %{assigns: %{channel_name: channel_name}} = socket) do
IO.inspect("Channel terminated with reason: #{inspect(reason)}")

Check warning on line 453 in lib/realtime_web/channels/realtime_channel.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.
Logger.debug("Channel terminated with reason: " <> inspect(reason))
push_system_message("system", socket, "ok", "Server requested disconnect", channel_name)
:telemetry.execute([:prom_ex, :plugin, :realtime, :disconnected], %{})
:ok
end
Expand Down
4 changes: 4 additions & 0 deletions lib/realtime_web/channels/user_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

@impl true
def connect(params, socket, opts) do
IO.inspect("connect")

Check warning on line 29 in lib/realtime_web/channels/user_socket.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.

if Application.fetch_env!(:realtime, :secure_channels) do
%{uri: %{host: host}, x_headers: headers} = opts

Expand Down Expand Up @@ -59,6 +61,8 @@
jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
{:ok, claims} <- ChannelsAuthorization.authorize_conn(token, jwt_secret_dec, jwt_jwks),
{:ok, postgres_cdc_module} <- PostgresCdc.driver(postgres_cdc_default) do
RealtimeWeb.Endpoint.subscribe(subscribers_id(external_id))

assigns = %RealtimeChannel.Assigns{
claims: claims,
jwt_secret: jwt_secret,
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},
"open_api_spex": {:hex, :open_api_spex, "3.21.2", "6a704f3777761feeb5657340250d6d7332c545755116ca98f33d4b875777e1e5", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0 or ~> 6.0", [hex: :poison, repo: "hexpm", optional: true]}, {:ymlr, "~> 2.0 or ~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :ymlr, repo: "hexpm", optional: true]}], "hexpm", "f42ae6ed668b895ebba3e02773cfb4b41050df26f803f2ef634c72a7687dc387"},
"parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"},
"phoenix": {:hex, :phoenix, "1.7.18", "5310c21443514be44ed93c422e15870aef254cf1b3619e4f91538e7529d2b2e4", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "1797fcc82108442a66f2c77a643a62980f342bfeb63d6c9a515ab8294870004e"},
"phoenix": {:hex, :phoenix, "1.7.19", "36617efe5afbd821099a8b994ff4618a340a5bfb25531a1802c4d4c634017a57", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "ba4dc14458278773f905f8ae6c2ec743d52c3a35b6b353733f64f02dfe096cd6"},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.4.3", "86e9878f833829c3f66da03d75254c155d91d72a201eb56ae83482328dc7ca93", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d36c401206f3011fefd63d04e8ef626ec8791975d9d107f9a0817d426f61ac07"},
"phoenix_html": {:hex, :phoenix_html, "3.3.4", "42a09fc443bbc1da37e372a5c8e6755d046f22b9b11343bf885067357da21cb3", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0249d3abec3714aff3415e7ee3d9786cb325be3151e6c4b3021502c585bf53fb"},
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.6", "7b1f0327f54c9eb69845fd09a77accf922f488c549a7e7b8618775eb603a62c7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "1681ab813ec26ca6915beb3414aa138f298e17721dc6a2bde9e6eb8a62360ff6"},
Expand Down
64 changes: 64 additions & 0 deletions test/integration/integration.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Integration do
import Generators

alias Realtime.Database
alias Realtime.Integration.WebsocketClient
alias Phoenix.Socket.V1
alias Realtime.Database
alias Realtime.Integration.WebsocketClient

@serializer V1.JSONSerializer
@secret "secure_jwt_secret"
@external_id "dev_tenant"
defp uri(port), do: "ws://#{@external_id}.localhost:#{port}/socket/websocket"
def token_valid(role, claims \\ %{}), do: generate_token(Map.put(claims, :role, role))
def token_no_role, do: generate_token()

def generate_token(claims \\ %{}) do
claims =
Map.merge(
%{
ref: "localhost",
iat: System.system_time(:second),
exp: System.system_time(:second) + 604_800
},
claims
)

{:ok, generate_jwt_token(@secret, claims)}
end

def get_connection(port, role \\ "anon", claims \\ %{}, params \\ %{vsn: "1.0.0", log_level: :warning}) do
params = Enum.reduce(params, "", fn {k, v}, acc -> "#{acc}&#{k}=#{v}" end)
uri = "#{uri(port)}?#{params}"

with {:ok, token} <- token_valid(role, claims),
{:ok, socket} <-
WebsocketClient.connect(self(), uri, @serializer, [{"x-api-key", token}]) do
{socket, token}
end
end

def rls_context(%{tenant: tenant} = context) do
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)

clean_table(db_conn, "realtime", "messages")
topic = Map.get(context, :topic, random_string())
message = message_fixture(tenant, %{topic: topic})

if policies = context[:policies] do
create_rls_policies(db_conn, policies, message)
end

Map.put(context, :topic, message.topic)
end

def change_tenant_configuration(limit, value) do
@external_id
|> Realtime.Tenants.get_tenant_by_external_id()
|> Realtime.Api.Tenant.changeset(%{limit => value})
|> Realtime.Repo.update!()

Realtime.Tenants.Cache.invalidate_tenant_cache(@external_id)
end
end
Loading
Loading