Skip to content

Commit

Permalink
fix: apply formatting and style fixes; enable in CI
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Jan 31, 2025
1 parent 244c2a2 commit f61a44f
Show file tree
Hide file tree
Showing 63 changed files with 216 additions and 294 deletions.
28 changes: 28 additions & 0 deletions .credo.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
%{
configs: [
%{
name: "default",
files: %{
included: ["lib/", "src/", "web/", "apps/"],
excluded: []
},
plugins: [],
requires: [],
strict: false,
parse_timeout: 5000,
color: true,
checks: %{
disabled: [
{Credo.Check.Design.TagTODO, []},
{Credo.Check.Consistency.ExceptionNames, []},
{Credo.Check.Refactor.Nesting, []},
{Credo.Check.Refactor.CyclomaticComplexity, []},
{Credo.Check.Readability.WithSingleClause, []},
{Credo.Check.Readability.AliasOrder, []},
{Credo.Check.Readability.StringSigils, []},
{Credo.Check.Refactor.Apply, []}
]
}
}
]
}
3 changes: 2 additions & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
import_deps: [:ecto, :ecto_sql, :phoenix, :open_api_spex],
subdirectories: ["priv/*/migrations"],
plugins: [Phoenix.LiveView.HTMLFormatter],
inputs: ["*.{heex,ex,exs}", "{config,lib,test}/**/*.{heex,ex,exs}", "priv/*/seeds.exs"]
inputs: ["*.{heex,ex,exs}", "{config,lib,test}/**/*.{heex,ex,exs}", "priv/*/seeds.exs"],
line_length: 120
]
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Run format check
run: mix format --check-formatted
- name: Credo checks
run: mix credo --strict --mute-exit-status
run: mix credo
- name: Retrieve PLT Cache
uses: actions/cache@v1
id: plt-cache
Expand Down
24 changes: 8 additions & 16 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,11 @@ socket_options =
end

config :realtime,
tenant_max_bytes_per_second:
System.get_env("TENANT_MAX_BYTES_PER_SECOND", "100000") |> String.to_integer(),
tenant_max_channels_per_client:
System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer(),
tenant_max_concurrent_users:
System.get_env("TENANT_MAX_CONCURRENT_USERS", "200") |> String.to_integer(),
tenant_max_events_per_second:
System.get_env("TENANT_MAX_EVENTS_PER_SECOND", "100") |> String.to_integer(),
tenant_max_joins_per_second:
System.get_env("TENANT_MAX_JOINS_PER_SECOND", "100") |> String.to_integer(),
tenant_max_bytes_per_second: System.get_env("TENANT_MAX_BYTES_PER_SECOND", "100000") |> String.to_integer(),
tenant_max_channels_per_client: System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer(),
tenant_max_concurrent_users: System.get_env("TENANT_MAX_CONCURRENT_USERS", "200") |> String.to_integer(),
tenant_max_events_per_second: System.get_env("TENANT_MAX_EVENTS_PER_SECOND", "100") |> String.to_integer(),
tenant_max_joins_per_second: System.get_env("TENANT_MAX_JOINS_PER_SECOND", "100") |> String.to_integer(),
metrics_cleaner_schedule_timer_in_ms:
System.get_env("METRICS_CLEANER_SCHEDULE_TIMER_IN_MS", "1800000") |> String.to_integer(),
rpc_timeout: System.get_env("RPC_TIMEOUT", "30000") |> String.to_integer()
Expand All @@ -57,10 +52,8 @@ else
janitor_max_children: System.get_env("JANITOR_MAX_CHILDREN", "5") |> String.to_integer(),
janitor_chunk_size: System.get_env("JANITOR_CHUNK_SIZE", "10") |> String.to_integer(),
# defaults the runner to only start after 10 minutes
janitor_run_after_in_ms:
System.get_env("JANITOR_RUN_AFTER_IN_MS", "600000") |> String.to_integer(),
janitor_children_timeout:
System.get_env("JANITOR_CHILDREN_TIMEOUT", "5000") |> String.to_integer(),
janitor_run_after_in_ms: System.get_env("JANITOR_RUN_AFTER_IN_MS", "600000") |> String.to_integer(),
janitor_children_timeout: System.get_env("JANITOR_CHILDREN_TIMEOUT", "5000") |> String.to_integer(),
# defaults to 4 hours
janitor_schedule_timer:
:timer.hours(4)
Expand Down Expand Up @@ -220,8 +213,7 @@ cluster_topologies =
],
heartbeat_interval: 5_000,
node_timeout: 15_000,
channel_name:
System.get_env("POSTGRES_CLUSTER_CHANNEL_NAME", "realtime_cluster_#{version}")
channel_name: System.get_env("POSTGRES_CLUSTER_CHANNEL_NAME", "realtime_cluster_#{version}")
]
]
] ++ acc
Expand Down
3 changes: 3 additions & 0 deletions lib/extensions/extensions.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
defmodule Realtime.Extensions do
@moduledoc """
This module provides functions to get extension settings.
"""
def db_settings(type) do
db_settings =
Application.get_env(:realtime, :extensions)
Expand Down
4 changes: 1 addition & 3 deletions lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ defmodule Extensions.PostgresCdcRls do
if node(pid) == node(manager_pid) do
%{meta | manager: manager_pid, subs_pool: subs_pool}
else
Logger.warning(
"Node mismatch for tenant #{tenant} #{inspect(node(pid))} #{inspect(node(manager_pid))}"
)
Logger.warning("Node mismatch for tenant #{tenant} #{inspect(node(pid))} #{inspect(node(manager_pid))}")

meta
end
Expand Down
4 changes: 2 additions & 2 deletions lib/extensions/postgres_cdc_rls/db_settings.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Extensions.PostgresCdcRls.DbSettings do
Schema callbacks for CDC RLS implementation.
"""

def default() do
def default do
%{
"poll_interval_ms" => 100,
"poll_max_changes" => 100,
Expand All @@ -13,7 +13,7 @@ defmodule Extensions.PostgresCdcRls.DbSettings do
}
end

def required() do
def required do
[
{"region", &is_binary/1, false},
{"db_host", &is_binary/1, true},
Expand Down
4 changes: 1 addition & 3 deletions lib/extensions/postgres_cdc_rls/message_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do

_ =
Enum.reduce(topic_subscriptions, %{}, fn
{_pid,
{:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, tenant, is_new_api}},
cache ->
{_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, tenant, is_new_api}}, cache ->
for {bin_id, id} <- ids, reduce: [] do
acc ->
if MapSet.member?(sub_ids, bin_id) do
Expand Down
16 changes: 5 additions & 11 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

{:ok, diff} = Replications.get_pg_stat_activity_diff(conn, db_pid)

Logger.warning(
"Database PID #{db_pid} found in pg_stat_activity with state_change diff of #{diff}"
)
Logger.warning("Database PID #{db_pid} found in pg_stat_activity with state_change diff of #{diff}")

if retry_count > 3 do
case Replications.terminate_backend(conn, slot_name) do
Expand All @@ -122,17 +120,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{timeout, backoff} = Backoff.backoff(backoff)
retry_ref = Process.send_after(self(), :retry, timeout)

{:noreply,
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}
{:noreply, %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}

{:error, reason} ->
log_error("PoolingReplicationError", reason)

{timeout, backoff} = Backoff.backoff(backoff)
retry_ref = Process.send_after(self(), :retry, timeout)

{:noreply,
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}
{:noreply, %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}
end
end

Expand All @@ -142,7 +138,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{:noreply, prepare_replication(state)}
end

def slot_name_suffix() do
def slot_name_suffix do
case Application.get_env(:realtime, :slot_name_suffix) do
nil -> ""
slot_name_suffix -> "_" <> slot_name_suffix
Expand All @@ -153,9 +149,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

defp convert_errors(_), do: nil

defp prepare_replication(
%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state
) do
defp prepare_replication(%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state) do
case Replications.prepare_replication(conn, slot_name) do
{:ok, _} ->
send(self(), :poll)
Expand Down
12 changes: 6 additions & 6 deletions lib/extensions/postgres_cdc_rls/subscription_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
Helpers.cancel_timer(ref)

q1 =
if !:queue.is_empty(q) do
if :queue.is_empty(q) do
q
else
{ids, q1} = Helpers.queue_take(q, @max_delete_records)
Logger.debug("delete sub id #{inspect(ids)}")

Expand All @@ -172,8 +174,6 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do

q
end
else
q
end

ref = if :queue.is_empty(q1), do: check_delete_queue(), else: check_delete_queue(1_000)
Expand Down Expand Up @@ -209,11 +209,11 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do

## Internal functions

defp check_oids(), do: Process.send_after(self(), :check_oids, @check_oids_interval)
defp check_oids, do: Process.send_after(self(), :check_oids, @check_oids_interval)

defp now(), do: System.system_time(:millisecond)
defp now, do: System.system_time(:millisecond)

defp check_no_users(), do: Process.send_after(self(), :check_no_users, @check_no_users_interval)
defp check_no_users, do: Process.send_after(self(), :check_no_users, @check_no_users_interval)

defp check_delete_queue(timeout \\ @timeout),
do: Process.send_after(self(), :check_delete_queue, timeout)
Expand Down
6 changes: 2 additions & 4 deletions lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do

@spec create(conn(), String.t(), [map()], pid(), pid()) ::
{:ok, Postgrex.Result.t()}
| {:error,
Exception.t() | :malformed_subscription_params | {:subscription_insert_failed, map()}}
| {:error, Exception.t() | :malformed_subscription_params | {:subscription_insert_failed, map()}}
def create(conn, publication, params_list, manager, caller) do
sql = "with sub_tables as (
select
Expand Down Expand Up @@ -85,8 +84,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
defp params_to_log(map) do
map
|> Map.to_list()
|> Enum.map(fn {k, v} -> "#{k}: #{to_log(v)}" end)
|> Enum.join(", ")
|> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
end

@spec delete(conn(), String.t()) :: any()
Expand Down
12 changes: 6 additions & 6 deletions lib/extensions/postgres_cdc_rls/subscriptions_checker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
Helpers.cancel_timer(ref)

new_queue =
if !:queue.is_empty(q) do
if :queue.is_empty(q) do
q
else
{ids, q1} = Helpers.queue_take(q, @max_delete_records)
Logger.warning("Delete #{length(ids)} phantom subscribers from db")

Expand All @@ -117,11 +119,9 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do

q
end
else
q
end

new_ref = if !:queue.is_empty(new_queue), do: check_delete_queue(), else: ref
new_ref = if :queue.is_empty(new_queue), do: ref, else: check_delete_queue()

{:noreply, %{state | delete_queue: %{ref: new_ref, queue: new_queue}}}
end
Expand Down Expand Up @@ -189,7 +189,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
Enum.reduce(pids, [], fn pid, acc -> if Process.alive?(pid), do: acc, else: [pid | acc] end)
end

defp check_delete_queue(), do: Process.send_after(self(), :check_delete_queue, 1000)
defp check_delete_queue, do: Process.send_after(self(), :check_delete_queue, 1000)

defp check_active_pids(), do: Process.send_after(self(), :check_active_pids, @timeout)
defp check_active_pids, do: Process.send_after(self(), :check_active_pids, @timeout)
end
9 changes: 3 additions & 6 deletions lib/extensions/postgres_cdc_rls/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Extensions.PostgresCdcRls.Supervisor do
alias Extensions.PostgresCdcRls

@spec start_link :: :ignore | {:error, any} | {:ok, pid}
def start_link() do
def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

Expand All @@ -20,17 +20,14 @@ defmodule Extensions.PostgresCdcRls.Supervisor do
children = [
{
PartitionSupervisor,
partitions: 20,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: PostgresCdcRls.DynamicSupervisor
partitions: 20, child_spec: DynamicSupervisor, strategy: :one_for_one, name: PostgresCdcRls.DynamicSupervisor
}
]

Supervisor.init(children, strategy: :one_for_one)
end

defp load_migrations_modules() do
defp load_migrations_modules do
{:ok, modules} = :application.get_key(:realtime, :modules)

modules
Expand Down
13 changes: 11 additions & 2 deletions lib/realtime/adapters/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ defmodule Realtime.Adapters.Changes do
@moduledoc """
This module provides structures of CDC changes.
"""
defmodule(Transaction, do: defstruct([:changes, :commit_timestamp]))
defmodule Transaction do
@moduledoc false
defstruct [:changes, :commit_timestamp]
end

defmodule NewRecord do
@moduledoc false
@derive {Jason.Encoder, except: [:subscription_ids]}
defstruct [
:columns,
Expand All @@ -24,6 +28,7 @@ defmodule Realtime.Adapters.Changes do
end

defmodule UpdatedRecord do
@moduledoc false
@derive {Jason.Encoder, except: [:subscription_ids]}
defstruct [
:columns,
Expand All @@ -39,6 +44,7 @@ defmodule Realtime.Adapters.Changes do
end

defmodule DeletedRecord do
@moduledoc false
@derive {Jason.Encoder, except: [:subscription_ids]}
defstruct [
:columns,
Expand All @@ -52,7 +58,10 @@ defmodule Realtime.Adapters.Changes do
]
end

defmodule(TruncatedRelation, do: defstruct([:type, :schema, :table, :commit_timestamp]))
defmodule TruncatedRelation do
@moduledoc false
defstruct [:type, :schema, :table, :commit_timestamp]
end
end

Protocol.derive(Jason.Encoder, Realtime.Adapters.Changes.Transaction)
Expand Down
Loading

0 comments on commit f61a44f

Please sign in to comment.