Skip to content

Commit

Permalink
fix: Call cache less often on Connect (#1213)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco authored Nov 12, 2024
1 parent b8172d2 commit e746bdd
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 26 deletions.
6 changes: 4 additions & 2 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Realtime.Tenants do
alias Realtime.Tenants.Cache
alias Realtime.UsersCounter
alias Realtime.Database
alias Realtime.Tenants.Cache

@doc """
Gets a list of connected tenant `external_id` strings in the cluster or a node.
Expand Down Expand Up @@ -67,13 +68,14 @@ defmodule Realtime.Tenants do

{:ok, health_conn} ->
connected_cluster = UsersCounter.tenant_users(external_id)
Migrations.maybe_run_migrations(health_conn, external_id)
tenant = Cache.get_tenant_by_external_id(external_id)
Migrations.maybe_run_migrations(health_conn, tenant)
{:ok, %{healthy: true, db_connected: true, connected_cluster: connected_cluster}}

connected_cluster when is_integer(connected_cluster) ->
tenant = Cache.get_tenant_by_external_id(external_id)
{:ok, db_conn} = Database.connect(tenant, "realtime_health_check", 1)
Migrations.maybe_run_migrations(db_conn, external_id)
Migrations.maybe_run_migrations(db_conn, tenant)
Process.alive?(db_conn) && GenServer.stop(db_conn)
{:ok, %{healthy: true, db_connected: false, connected_cluster: connected_cluster}}
end
Expand Down
6 changes: 3 additions & 3 deletions lib/realtime/tenants/batch_broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ defmodule Realtime.Tenants.BatchBroadcast do
send_message_and_count(tenant, sub_topic, event, payload, true)
end)

tenant_db_conn =
Connect.lookup_or_start_connection(tenant.external_id)

# Handle events for private channel
events
|> Map.get(true, [])
|> Enum.group_by(fn event -> Map.get(event, :topic) end)
|> Enum.each(fn {topic, events} ->
tenant_db_conn =
Connect.lookup_or_start_connection(tenant.external_id)

if super_user do
Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} ->
send_message_and_count(tenant, sub_topic, event, payload, false)
Expand Down
1 change: 1 addition & 0 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ defmodule Realtime.Tenants.Connect do
Logger.metadata(external_id: tenant_id, project: tenant_id)

with {:ok, acc} <- Piper.run(@pipes, state) do
acc = Map.delete(acc, :tenant)
{:ok, acc, {:continue, :setup_connected_user_events}}
else
{:error, :tenant_not_found} ->
Expand Down
4 changes: 1 addition & 3 deletions lib/realtime/tenants/connect/check_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ defmodule Realtime.Tenants.Connect.CheckConnection do
Check tenant database connection.
"""
alias Realtime.Database
alias Realtime.Tenants.Cache

@application_name "realtime_connect"
@behaviour Realtime.Tenants.Connect.Piper
@impl true
def run(acc) do
%{tenant_id: tenant_id} = acc
tenant = Cache.get_tenant_by_external_id(tenant_id)
%{tenant: tenant} = acc

case Database.check_tenant_connection(tenant, @application_name) do
{:ok, conn} ->
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/connect/get_tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Realtime.Tenants.Connect.GetTenant do
%{tenant_id: tenant_id} = acc

case Tenants.Cache.get_tenant_by_external_id(tenant_id) do
%Tenant{} -> {:ok, acc}
%Tenant{} = tenant -> {:ok, Map.put(acc, :tenant, tenant)}
_ -> {:error, :tenant_not_found}
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/realtime/tenants/connect/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ defmodule Realtime.Tenants.Connect.Migrations do
alias Realtime.Tenants.Migrations

@impl true
def run(%{db_conn_pid: db_conn_pid, tenant_id: tenant_id} = acc) do
{:ok, _} = Migrations.maybe_run_migrations(db_conn_pid, tenant_id)
def run(%{db_conn_pid: db_conn_pid, tenant: tenant} = acc) do
{:ok, _} = Migrations.maybe_run_migrations(db_conn_pid, tenant)
{:ok, acc}
end
end
6 changes: 2 additions & 4 deletions lib/realtime/tenants/connect/start_counters.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ defmodule Realtime.Tenants.Connect.StartCounters do
alias Realtime.GenCounter
alias Realtime.RateCounter
alias Realtime.Tenants
alias Realtime.Tenants.Cache

@behaviour Realtime.Tenants.Connect.Piper

@impl true
def run(acc) do
%{tenant_id: tenant_id} = acc
%{tenant: tenant} = acc

with tenant when not is_nil(tenant) <- Cache.get_tenant_by_external_id(tenant_id),
:ok <- start_joins_per_second_counter(tenant),
with :ok <- start_joins_per_second_counter(tenant),
:ok <- start_max_events_counter(tenant),
:ok <- start_db_events_counter(tenant) do
{:ok, acc}
Expand Down
7 changes: 3 additions & 4 deletions lib/realtime/tenants/connect/start_replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ defmodule Realtime.Tenants.Connect.StartReplication do

@behaviour Realtime.Tenants.Connect.Piper
alias Realtime.BroadcastChanges.Handler
alias Realtime.Tenants.Cache

@impl true
def run(acc) do
%{tenant_id: tenant_id} = acc
tenant = Cache.get_tenant_by_external_id(tenant_id)
%{tenant: tenant} = acc

if tenant.notify_private_alpha do
opts = %Handler{tenant_id: tenant_id}
opts = %Handler{tenant_id: tenant.external_id}
supervisor_spec = Handler.supervisor_spec(tenant)

child_spec = %{
Expand Down
11 changes: 5 additions & 6 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Realtime.Tenants.Migrations do
alias Realtime.Database
alias Realtime.Registry.Unique
alias Realtime.Repo
alias Realtime.Tenants.Cache
alias Realtime.Api.Tenant

alias Realtime.Tenants.Migrations.{
CreateRealtimeSubscriptionTable,
Expand Down Expand Up @@ -196,19 +196,18 @@ defmodule Realtime.Tenants.Migrations do
If not all migrations have been run, it will run the missing migrations.
"""
@spec maybe_run_migrations(pid(), String.t()) :: {:ok, any()} | {:error, any()}
def maybe_run_migrations(db_conn, tenant_external_id) do
@spec maybe_run_migrations(pid(), Tenant.t()) :: {:ok, any()} | {:error, any()}
def maybe_run_migrations(db_conn, tenant) do
query =
"select * from pg_catalog.pg_tables where schemaname = 'realtime' and tablename = 'schema_migrations';"

%{extensions: [%{settings: settings} | _]} =
Cache.get_tenant_by_external_id(tenant_external_id)
%{extensions: [%{settings: settings} | _]} = tenant

Database.transaction(db_conn, fn transaction_conn ->
%{num_rows: num_rows} = Postgrex.query!(transaction_conn, query, [])

if num_rows < @expected_migration_count do
run_migrations(%__MODULE__{tenant_external_id: tenant_external_id, settings: settings})
run_migrations(%__MODULE__{tenant_external_id: tenant.external_id, settings: settings})
end
end)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.37",
version: "2.33.38",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down

0 comments on commit e746bdd

Please sign in to comment.