Skip to content

Commit

Permalink
fix: Fix total retry time from reconnnection attempts (#2306)
Browse files Browse the repository at this point in the history
The sign of the total retry time was reversed. Lack of tests -> dumb
mistakes.

Is there any reason we don't have a test suite for the
`Connection.Manager`? It's a bit of an overhead to set it up, I've
separated the `ConnectionBackoff` logic to basically just slightly
enhance `:backoff` with total retry time and reverted the rest of the
logic in the connection manager to be how it was (probably was the best
option to start with).
  • Loading branch information
msfstef authored Feb 6, 2025
1 parent eaf0c29 commit 519fc8a
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .changeset/breezy-knives-bow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Separate `ConnectionBackoff` logic for `Connection.Manager` to enhance `:bakckoff` functionality.
63 changes: 14 additions & 49 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ defmodule Electric.Connection.Manager do
end

use GenServer
alias Electric.Connection.Manager.ConnectionBackoff

require Logger

Expand Down Expand Up @@ -185,11 +186,7 @@ defmodule Electric.Connection.Manager do
timeline_opts: timeline_opts,
shape_cache_opts: shape_cache_opts,
pg_lock_acquired: false,
connection_backoff: %{
backoff: :backoff.init(1000, 10_000),
retries_started_at: nil,
timer_ref: nil
},
connection_backoff: {ConnectionBackoff.init(1000, 10_000), nil},
stack_id: Keyword.fetch!(opts, :stack_id),
stack_events_registry: Keyword.fetch!(opts, :stack_events_registry),
tweaks: Keyword.fetch!(opts, :tweaks),
Expand Down Expand Up @@ -363,13 +360,9 @@ defmodule Electric.Connection.Manager do
@impl true
def handle_info(
{:timeout, tref, step},
%State{connection_backoff: %{timer_ref: tref} = conn_backoff} = state
%State{connection_backoff: {conn_backoff, tref}} = state
) do
state = %State{
state
| connection_backoff: %{conn_backoff | timer_ref: nil}
}

state = %State{state | connection_backoff: {conn_backoff, nil}}
handle_continue(step, state)
end

Expand Down Expand Up @@ -600,7 +593,7 @@ defmodule Electric.Connection.Manager do
{:database_connection_failed,
%{
message: message,
total_retry_time: total_retry_time(state)
total_retry_time: ConnectionBackoff.total_retry_time(elem(state.connection_backoff, 0))
}}
)

Expand Down Expand Up @@ -655,53 +648,25 @@ defmodule Electric.Connection.Manager do
defp schedule_reconnection(
step,
%State{
connection_backoff: %{
backoff: backoff,
retries_started_at: retries_started_at
}
connection_backoff: {conn_backoff, _}
} = state
) do
{time, backoff} = :backoff.fail(backoff)
{time, conn_backoff} = ConnectionBackoff.fail(conn_backoff)
tref = :erlang.start_timer(time, self(), step)
Logger.warning("Reconnecting in #{inspect(time)}ms")

%State{
state
| connection_backoff: %{
backoff: backoff,
retries_started_at: retries_started_at || System.monotonic_time(:millisecond),
timer_ref: tref
}
}
%State{state | connection_backoff: {conn_backoff, tref}}
end

# If total backoff time is 0 then there were no reconnection attempts
defp mark_connection_succeeded(%State{connection_backoff: %{retries_started_at: nil}} = state),
do: state
defp mark_connection_succeeded(%State{connection_backoff: {conn_backoff, tref}} = state) do
{total_retry_time, conn_backoff} = ConnectionBackoff.succeed(conn_backoff)

# Otherwise, reset the backoff and total backoff time
defp mark_connection_succeeded(%State{connection_backoff: %{backoff: backoff}} = state) do
{_, backoff} = :backoff.succeed(backoff)
Logger.info("Reconnection succeeded after #{inspect(total_retry_time(state))}ms")
if total_retry_time > 0 do
Logger.info("Reconnection succeeded after #{inspect(total_retry_time)}ms")
end

%State{
state
| connection_backoff: %{
state.connection_backoff
| backoff: backoff,
retries_started_at: nil
}
}
%State{state | connection_backoff: {conn_backoff, tref}}
end

defp total_retry_time(%State{connection_backoff: %{retries_started_at: nil}}),
do: 0

defp total_retry_time(%State{
connection_backoff: %{retries_started_at: retries_started_at}
}),
do: retries_started_at - System.monotonic_time(:millisecond)

defp update_ssl_opts(connection_opts) do
ssl_opts =
case connection_opts[:sslmode] do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Electric.Connection.Manager.ConnectionBackoff do
@type connection_backoff :: %{
backoff: :backoff.backoff(),
retries_started_at: nil | integer()
}

@spec init(pos_integer(), :infinity | pos_integer()) :: connection_backoff()
def init(start, max),
do: %{backoff: :backoff.init(start, max), retries_started_at: nil}

@spec succeed(connection_backoff()) :: {pos_integer(), connection_backoff()}
def succeed(%{backoff: backoff} = conn_backoff) do
{_, backoff} = :backoff.succeed(backoff)

{total_retry_time(conn_backoff), %{backoff: backoff, retries_started_at: nil}}
end

@spec fail(connection_backoff()) :: {pos_integer(), connection_backoff()}
def fail(%{backoff: backoff, retries_started_at: retries_started_at}) do
{time, backoff} = :backoff.fail(backoff)

{time,
%{
backoff: backoff,
retries_started_at: retries_started_at || System.monotonic_time(:millisecond)
}}
end

@spec total_retry_time(connection_backoff()) :: pos_integer()
def total_retry_time(%{retries_started_at: nil}),
do: 0

def total_retry_time(%{retries_started_at: retries_started_at}),
do: System.monotonic_time(:millisecond) - retries_started_at
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Electric.Connection.Manager.ConnectionBackoffTest do
use ExUnit.Case
alias Electric.Connection.Manager.ConnectionBackoff

describe "total_retry_time/1" do
test "returns 0 when no failures present" do
backoff = ConnectionBackoff.init(100, :infinity)
assert ConnectionBackoff.total_retry_time(backoff) == 0
end

test "returns elapsed time since first failure" do
backoff = ConnectionBackoff.init(100, :infinity)
{_time, failed_backoff} = ConnectionBackoff.fail(backoff)

# Simulate some delay
Process.sleep(50)
assert ConnectionBackoff.total_retry_time(failed_backoff) >= 50
end

test "resets to 0 after succeed/1" do
backoff = ConnectionBackoff.init(100, :infinity)
{_time, failed_backoff} = ConnectionBackoff.fail(backoff)

# Simulate some delay
Process.sleep(50)
assert ConnectionBackoff.total_retry_time(failed_backoff) >= 50
{_retry_time, reset_backoff} = ConnectionBackoff.succeed(failed_backoff)

assert ConnectionBackoff.total_retry_time(reset_backoff) == 0
end
end
end

0 comments on commit 519fc8a

Please sign in to comment.