Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use ets for janitor instead of syn #1290

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Realtime.Application do
)

Realtime.PromEx.set_metrics_tags()

:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
:syn.set_event_handler(Realtime.SynHandler)

:ok = :syn.add_node_to_scopes([Realtime.Tenants.Connect])
Expand Down
5 changes: 3 additions & 2 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,13 @@ defmodule Realtime.Tenants.Connect do
def handle_continue(:setup_connected_user_events, state) do
%{
check_connected_user_interval: check_connected_user_interval,
connected_users_bucket: connected_users_bucket
connected_users_bucket: connected_users_bucket,
tenant_id: tenant_id
} = state

:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:invalidate_cache")
send_connected_user_check_message(connected_users_bucket, check_connected_user_interval)

:ets.insert(__MODULE__, {tenant_id})
{:noreply, state}
end

Expand Down
13 changes: 6 additions & 7 deletions lib/realtime/tenants/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ defmodule Realtime.Tenants.Janitor do
alias Realtime.Tenants
alias Realtime.Tenants.Migrations

@table_name Realtime.Tenants.Connect
@matchspec [{:"$1", [], [:"$1"]}]

@type t :: %__MODULE__{
timer: pos_integer() | nil,
region: String.t() | nil,
Expand Down Expand Up @@ -57,18 +60,13 @@ defmodule Realtime.Tenants.Janitor do
{:ok, state}
end

@table_name :"syn_registry_by_name_Elixir.Realtime.Tenants.Connect"
@impl true
def handle_info(:delete_old_messages, state) do
Logger.info("Janitor started")
%{chunks: chunks, tasks: tasks} = state

matchspec = [
{{:"$1", :"$2", :"$3", :"$4", :"$5", Node.self()}, [], [:"$1"]}
]

new_tasks =
:ets.select(@table_name, matchspec)
:ets.select(@table_name, @matchspec)
|> Stream.chunk_every(chunks)
|> Stream.map(fn chunks ->
task =
Expand Down Expand Up @@ -119,9 +117,10 @@ defmodule Realtime.Tenants.Janitor do

defp perform_mantaince_tasks(tenants), do: Enum.map(tenants, &perform_mantaince_task/1)

defp perform_mantaince_task(tenant_external_id) do
defp perform_mantaince_task({tenant_external_id}) do
Logger.metadata(project: tenant_external_id, external_id: tenant_external_id)
Logger.info("Janitor starting realtime.messages cleanup")
:ets.delete(@table_name, tenant_external_id)

with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id),
{:ok, conn} <- Database.connect(tenant, "realtime_janitor"),
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.17",
version: "2.34.18",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
11 changes: 7 additions & 4 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
alias Realtime.Integration.WebsocketClient
alias Realtime.Repo
alias Realtime.Tenants
alias Realtime.Tenants.Cache
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Migrations

Expand Down Expand Up @@ -74,6 +75,8 @@
end

setup do
Cache.invalidate_tenant_cache(@external_id)
Process.sleep(500)
[tenant] = Tenant |> Repo.all() |> Repo.preload(:extensions)
:ok = Migrations.run_migrations(tenant)
%{tenant: tenant}
Expand Down Expand Up @@ -1228,7 +1231,7 @@
} do
change_tenant_configuration(:private_only, true)

Realtime.Tenants.Cache.invalidate_tenant_cache(@external_id)

Check warning on line 1234 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

Process.sleep(100)

Expand Down Expand Up @@ -1371,10 +1374,9 @@
realtime_topic = "realtime:#{random_string()}"
WebsocketClient.join(socket, realtime_topic, %{config: config})

for _ <- 1..10 do
Process.sleep(100)

for _ <- 1..1000 do
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
1..5 |> Enum.random() |> Process.sleep()
end

assert_receive %Message{
Expand Down Expand Up @@ -1446,8 +1448,9 @@
config = %{broadcast: %{self: true}, private: false}
realtime_topic = "realtime:#{random_string()}"

for _ <- 1..15 do
for _ <- 1..1000 do
WebsocketClient.join(socket, realtime_topic, %{config: config})
1..10 |> Enum.random() |> Process.sleep()
end

assert_receive %Message{
Expand Down Expand Up @@ -1566,10 +1569,10 @@
end

def setup_trigger(%{tenant: tenant, topic: topic} = context) do
Realtime.Tenants.Connect.shutdown(@external_id)

Check warning on line 1572 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.
Process.sleep(500)

{:ok, db_conn} = Realtime.Tenants.Connect.connect(@external_id)

Check warning on line 1575 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

random_name = String.downcase("test_#{random_string()}")
query = "CREATE TABLE #{random_name} (id serial primary key, details text)"
Expand Down Expand Up @@ -1606,7 +1609,7 @@
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
query = "DROP TABLE #{random_name} CASCADE"
Postgrex.query!(db_conn, query, [])
Realtime.Tenants.Connect.shutdown(db_conn)

Check warning on line 1612 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

Process.sleep(500)
end)
Expand All @@ -1619,9 +1622,9 @@
defp change_tenant_configuration(limit, value) do
@external_id
|> Realtime.Tenants.get_tenant_by_external_id()
|> Realtime.Api.Tenant.changeset(%{limit => value})

Check warning on line 1625 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.
|> Realtime.Repo.update!()

Realtime.Tenants.Cache.invalidate_tenant_cache(@external_id)

Check warning on line 1628 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.
end
end
10 changes: 7 additions & 3 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,13 @@ defmodule Realtime.DatabaseTest do
test "on checkout error, handles raised exception as an error", %{db_conn: db_conn} do
assert capture_log(fn ->
Task.start(fn ->
Database.transaction(db_conn, fn conn ->
Postgrex.query!(conn, "SELECT pg_sleep(14)", [])
end)
Database.transaction(
db_conn,
fn conn ->
Postgrex.query!(conn, "SELECT pg_sleep(20)", [])
end,
timeout: 20000
)
end)

assert {:error, %DBConnection.ConnectionError{reason: :queue_timeout}} =
Expand Down
21 changes: 20 additions & 1 deletion test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,38 @@ defmodule Realtime.Tenants.ConnectTest do
alias Realtime.UsersCounter

setup do
:ets.delete_all_objects(Connect)
tenant = tenant_fixture()
Cleanup.ensure_no_replication_slot()
%{tenant: tenant}
end

describe "lookup_or_start_connection/1" do
test "if tenant exists and connected, returns the db connection", %{tenant: tenant} do
test "if tenant exists and connected, returns the db connection and tracks it in ets", %{
tenant: tenant
} do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(100)
assert is_pid(db_conn)
Connect.shutdown(tenant.external_id)
end

test "tracks multiple users that connect and disconnect" do
expected =
for _ <- 1..10 do
tenant = tenant_fixture()
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(100)
assert is_pid(db_conn)
Connect.shutdown(tenant.external_id)
{tenant.external_id}
end

result = :ets.select(Connect, [{:"$1", [], [:"$1"]}]) |> Enum.sort()
expected = Enum.sort(expected)
assert result == expected
end

test "on database disconnect, returns new connection", %{tenant: tenant} do
assert {:ok, old_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(500)
Expand Down
9 changes: 5 additions & 4 deletions test/realtime/tenants/janitor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Realtime.Tenants.JanitorTest do
alias Realtime.Tenants.Connect

setup do
:ets.delete_all_objects(Connect)
dev_tenant = Tenant |> Repo.all() |> hd()
timer = Application.get_env(:realtime, :janitor_schedule_timer)

Expand Down Expand Up @@ -89,6 +90,7 @@ defmodule Realtime.Tenants.JanitorTest do

assert MapSet.difference(current, to_keep) |> MapSet.size() == 0
assert_called(Migrations.create_partitions(:_))
assert :ets.tab2list(Connect) == []
end
end

Expand All @@ -113,16 +115,15 @@ defmodule Realtime.Tenants.JanitorTest do

tenant = tenant_fixture(%{extensions: extensions})
# Force add a bad tenant
:ets.insert(
:"syn_registry_by_name_Elixir.Realtime.Tenants.Connect",
{tenant.external_id, :undefined, :undefined, :undefined, :undefined, Node.self()}
)
:ets.insert(Connect, {tenant.external_id})

Process.sleep(250)

assert capture_log(fn ->
start_supervised!(Janitor)
Process.sleep(1000)
end) =~ "JanitorFailedToDeleteOldMessages"

assert :ets.tab2list(Connect) == []
end
end
Loading