Skip to content

Commit

Permalink
fix: Move logs to warnings; Add error for connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Jan 28, 2025
1 parent 6985ead commit b803ee6
Show file tree
Hide file tree
Showing 16 changed files with 213 additions and 141 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ This is the list of operational codes that can help you understand your deployme
| UnableToFindCounter | Error when trying to find a counter to track rate limits for a tenant |
| UnhandledProcessMessage | Unhandled message received by a Realtime process |
| UnableToSetPolicies | We were not able to set policies for this connection |
| IncreaseConnectionPool | The number of connections you have set for Realtime are not enough to handle your current use case |
| ConnectionInitializing | Database is initializing connection |
| DatabaseConnectionIssue | Database had connection issues and connection was not able to be established |
| UnableToConnectToProject | Unable to connect to Project database |
Expand Down
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ config :tailwind,
# Configures Elixir's Logger
config :logger, :console,
format: "$time $metadata[$level] $message\n",
metadata: [:request_id, :project, :external_id, :application_name]
metadata: [:request_id, :project, :external_id, :application_name, :sub, :error_code]

# Use Jason for JSON parsing in Phoenix
config :phoenix, :json_library, Jason
Expand Down
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ config :joken,
current_time_adapter: RealtimeWeb.Joken.CurrentTime.Mock

# Print only errors during test
config :logger, level: :error
config :logger, level: :warning

# Configures Elixir's Logger
config :logger, :console,
Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
msg =
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"

log_error("RealtimeDisabledForConfiguration", msg)
log_warning("RealtimeDisabledForConfiguration", msg)
rollback(conn, msg)

{:error, exception} ->
Expand Down
8 changes: 8 additions & 0 deletions lib/realtime/logs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ defmodule Realtime.Logs do
def log_error(code, error, metadata \\ []) do
Logger.error("#{code}: #{to_log(error)}", [error_code: code] ++ metadata)
end

@doc """
Logs warning with a given Operational Code
"""
@spec log_error(String.t(), any(), keyword()) :: :ok
def log_warning(code, warning, metadata \\ []) do
Logger.warning("#{code}: #{to_log(warning)}", [error_code: code] ++ metadata)
end
end

defimpl Jason.Encoder, for: DBConnection.ConnectionError do
Expand Down
13 changes: 9 additions & 4 deletions lib/realtime/tenants/authorization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Realtime.Tenants.Authorization do
alias Realtime.Database
alias Realtime.Repo
alias Realtime.Tenants.Authorization.Policies

alias DBConnection.ConnectionError
defstruct [:topic, :headers, :jwt, :claims, :role]

@type t :: %__MODULE__{
Expand Down Expand Up @@ -59,9 +59,10 @@ defmodule Realtime.Tenants.Authorization do
def get_read_authorizations(%Socket{} = socket, db_conn, authorization_context) do
policies = Map.get(socket.assigns, :policies) || %Policies{}

with {:ok, %Policies{} = policies} <-
get_read_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, Socket.assign(socket, :policies, policies)}
case get_read_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Socket.assign(socket, :policies, policies)}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end

Expand All @@ -70,6 +71,7 @@ defmodule Realtime.Tenants.Authorization do

case get_read_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Conn.assign(conn, :policies, policies)}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end
Expand All @@ -89,6 +91,7 @@ defmodule Realtime.Tenants.Authorization do

case get_write_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Socket.assign(socket, :policies, policies)}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end
Expand All @@ -98,13 +101,15 @@ defmodule Realtime.Tenants.Authorization do

case get_write_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Conn.assign(conn, :policies, policies)}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end

def get_write_authorizations(db_conn, db_conn, authorization_context) when is_pid(db_conn) do
case get_write_policies_for_connection(db_conn, authorization_context, %Policies{}) do
{:ok, %Policies{} = policies} -> {:ok, policies}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Realtime.Tenants.Migrations do
@moduledoc """
Run Realtime database migrations for tenant's database.
"""
use GenServer
use GenServer, restart: :transient

require Logger

Expand Down
36 changes: 19 additions & 17 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ defmodule RealtimeWeb.RealtimeChannel do
{:ok, state, assign(socket, assigns)}
else
{:error, :expired_token, msg} ->
Logging.log_error_message(:error, "InvalidJWTToken", msg)
Logging.log_error_message(:warning, "InvalidJWTToken", msg)

{:error, :missing_claims} ->
msg = "Fields `role` and `exp` are required in JWT"
Logging.log_error_message(:error, "InvalidJWTToken", msg)
Logging.log_error_message(:warning, "InvalidJWTToken", msg)

{:error, :expected_claims_map} ->
msg = "Token claims must be a map"
Logging.log_error_message(:error, "InvalidJWTToken", msg)
Logging.log_error_message(:warning, "InvalidJWTToken", msg)

{:error, :unauthorized, msg} ->
Logging.log_error_message(:error, "Unauthorized", msg)
Logging.log_error_message(:warning, "Unauthorized", msg)

{:error, :too_many_channels} ->
msg = "Too many channels"
Expand Down Expand Up @@ -150,6 +150,13 @@ defmodule RealtimeWeb.RealtimeChannel do
"Connecting to the project database"
)

{:error, :increase_connection_pool} ->
msg = "Please increase your connection pool size"
Logging.log_error_message(:warning, "IncreaseConnectionPool", msg)

{:error, :unable_to_set_policies, error} ->
Logging.log_error_message(:warning, "InvalidJWTExpiration", error)

{:error, invalid_exp} when is_integer(invalid_exp) and invalid_exp <= 0 ->
Logging.log_error_message(
:error,
Expand Down Expand Up @@ -180,13 +187,6 @@ defmodule RealtimeWeb.RealtimeChannel do
"Realtime is restarting, please standby"
)

{:error, :unable_to_set_policies} ->
Logging.log_error_message(
:error,
"UnableToSetPolicies",
"Unable to set policies for connection"
)

{:error, error} ->
Logging.log_error_message(:error, "UnknownErrorOnChannel", error)
end
Expand Down Expand Up @@ -284,7 +284,7 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, :pg_sub_ref, nil)}

error ->
log_error("UnableToSubscribeToPostgres", error)
log_warning("UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
Expand All @@ -294,13 +294,13 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe())}

error ->
log_error("UnableToSubscribeToPostgres", error)
log_warning("UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
rescue
error ->
log_error("UnableToSubscribeToPostgres", error)
log_warning("UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
Expand Down Expand Up @@ -585,8 +585,8 @@ defmodule RealtimeWeb.RealtimeChannel do
defp shutdown_response(socket, message) when is_binary(message) do
%{assigns: %{channel_name: channel_name, access_token: access_token}} = socket
metadata = log_metadata(access_token)
log_error("ChannelShutdown", message, metadata)
push_system_message("system", socket, "error", message, channel_name)
log_warning("ChannelShutdown", message, metadata)
{:stop, :shutdown, socket}
end

Expand Down Expand Up @@ -749,9 +749,11 @@ defmodule RealtimeWeb.RealtimeChannel do
{:ok, socket}
end
else
{:error, :increase_connection_pool} ->
{:error, :increase_connection_pool}

{:error, error} ->
log_error("UnableToSetPolicies", error)
{:error, :unable_to_set_policies}
{:error, :unable_to_set_policies, error}
end
end

Expand Down
22 changes: 16 additions & 6 deletions lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
"""
require Logger
import Phoenix.Socket, only: [assign: 3]
import Realtime.Logs

alias Phoenix.Socket
alias Realtime.GenCounter
Expand All @@ -29,14 +30,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
}
} = socket
) do
{:ok, socket} = run_authorization_check(socket, db_conn, authorization_context)
with {:ok, %{assigns: %{policies: policies}}} <-
run_authorization_check(socket, db_conn, authorization_context) do
case policies do
%Policies{broadcast: %BroadcastPolicies{write: false}} ->
Logger.info("Broadcast message ignored on #{tenant_topic}")

case socket.assigns.policies do
%Policies{broadcast: %BroadcastPolicies{write: false}} ->
Logger.info("Broadcast message ignored on #{tenant_topic}")
_ ->
send_message(self_broadcast, tenant_topic, payload)
end
else
{:error, :increase_connection_pool} ->
log_error("IncreaseConnectionPool", "Please increase your connection pool size")
{:error, :unable_to_set_policies}

_ ->
send_message(self_broadcast, tenant_topic, payload)
{:error, error} ->
log_error("UnableToSetPolicies", error)
{:error, :unable_to_set_policies}
end

socket = increment_rate_counter(socket)
Expand Down
11 changes: 5 additions & 6 deletions lib/realtime_web/channels/realtime_channel/logging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
) :: {:error, %{reason: binary()}}
def log_error_message(level, code, error, metadata \\ [])

def log_error_message(:warning, _code, error, metadata) do
error_msg = "Start channel error: " <> to_log(error)
Logger.warning(error_msg, metadata)
{:error, %{reason: error_msg}}
end

def log_error_message(:error, code, error, metadata) do
log_error(code, error, metadata)
{:error, %{reason: error}}
end

def log_error_message(:warning, code, error, metadata) do
log_warning(code, error, metadata)
{:error, %{reason: error}}
end
end
10 changes: 5 additions & 5 deletions lib/realtime_web/channels/user_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ defmodule RealtimeWeb.UserSocket do
{:error, :tenant_not_found}

{:error, :expired_token, msg} ->
log_error_with_token_metadata(msg, token)
log_warning_with_token_metadata(msg, token)
{:error, :expired_token}

{:error, :missing_claims} ->
log_error_with_token_metadata("Fields `role` and `exp` are required in JWT", token)
log_warning_with_token_metadata("Fields `role` and `exp` are required in JWT", token)
{:error, :missing_claims}

error ->
Expand All @@ -108,14 +108,14 @@ defmodule RealtimeWeb.UserSocket do
end
end

defp log_error_with_token_metadata(msg, token) do
defp log_warning_with_token_metadata(msg, token) do
case Joken.peek_claims(token) do
{:ok, claims} ->
sub = Map.get(claims, "sub")
log_error("InvalidJWTToken", msg, sub: sub)
log_warning("InvalidJWTToken", msg, sub: sub)

_ ->
log_error("InvalidJWTToken", msg)
log_warning("InvalidJWTToken", msg)
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.10",
version: "2.34.11",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading

0 comments on commit b803ee6

Please sign in to comment.