Skip to content

Commit

Permalink
chore: Minor Connection.Manager and Consumer cleanup and flake re…
Browse files Browse the repository at this point in the history
…duction (#2305)

- Removes `ConnectionBackoff` struct, scope is small and it's verbose
(@robacourt nits are good we like them)
- Uses the `State` struct everywhere - since we have it it makes sense
to use it
- Remove `ShapeCache` dependency in `Consumer` as it already has
`ShapeStatus` available
- Uses `Consumer.whereis` in the `Consumer` tests for allowing Mox
assertions for consistency
- Fixes the `Consumer` test flakes by waiting for the
`set_snapshot_started` in the setup before running the tests, as the
tests "override" some Mox expectations and allowances set in the setup
if the snapshot is not ready before they get defined.

One thing that really worked for me that I hadn't tried before, is to
use a tool like `stress` (e.g. `stress -c 10`) to max out the resources
used by my machine while running tests to simulate a lower resource
system that the tests on CI run on. I managed to consistently reproduce
the consumer test flakes that way and debug them.


No changeset cause this is all refactoring and test fixing
  • Loading branch information
msfstef authored Feb 6, 2025
1 parent 8955a58 commit eaf0c29
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 193 deletions.
81 changes: 40 additions & 41 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ defmodule Electric.Connection.Manager do
]
end

defmodule ConnectionBackoff do
defstruct [
:backoff,
:retries_started_at,
:timer_ref
]
end

use GenServer

require Logger
Expand Down Expand Up @@ -193,7 +185,11 @@ defmodule Electric.Connection.Manager do
timeline_opts: timeline_opts,
shape_cache_opts: shape_cache_opts,
pg_lock_acquired: false,
connection_backoff: %ConnectionBackoff{backoff: :backoff.init(1000, 10_000)},
connection_backoff: %{
backoff: :backoff.init(1000, 10_000),
retries_started_at: nil,
timer_ref: nil
},
stack_id: Keyword.fetch!(opts, :stack_id),
stack_events_registry: Keyword.fetch!(opts, :stack_events_registry),
tweaks: Keyword.fetch!(opts, :tweaks),
Expand All @@ -207,13 +203,13 @@ defmodule Electric.Connection.Manager do
end

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
def handle_call(:get_pg_version, _from, %State{pg_version: pg_version} = state) do
# If we haven't queried the PG version by the time it is requested, that's a fatal error.
false = is_nil(pg_version)
{:reply, pg_version, state}
end

def handle_call(:get_status, _from, %{pg_lock_acquired: pg_lock_acquired} = state) do
def handle_call(:get_status, _from, %State{pg_lock_acquired: pg_lock_acquired} = state) do
status =
cond do
not pg_lock_acquired ->
Expand All @@ -230,16 +226,16 @@ defmodule Electric.Connection.Manager do
{:reply, status, state}
end

def handle_call(:await_active, from, %{pool_pid: nil} = state) do
{:noreply, %{state | awaiting_active: [from | state.awaiting_active]}}
def handle_call(:await_active, from, %State{pool_pid: nil} = state) do
{:noreply, %State{state | awaiting_active: [from | state.awaiting_active]}}
end

def handle_call(:await_active, _from, state) do
{:reply, :ok, state}
end

def handle_call(:drop_replication_slot_on_stop, _from, state) do
{:reply, :ok, %{state | drop_slot_requested: true}}
{:reply, :ok, %State{state | drop_slot_requested: true}}
end

def handle_call(:report_retained_wal_size, _from, state) do
Expand All @@ -263,7 +259,7 @@ defmodule Electric.Connection.Manager do
case start_lock_connection(opts) do
{:ok, pid, connection_opts} ->
state = mark_connection_succeeded(state)
state = %{state | lock_connection_pid: pid, connection_opts: connection_opts}
state = %State{state | lock_connection_pid: pid, connection_opts: connection_opts}

Electric.StackSupervisor.dispatch_stack_event(
state.stack_events_registry,
Expand All @@ -290,7 +286,7 @@ defmodule Electric.Connection.Manager do
case start_replication_client(opts) do
{:ok, pid, connection_opts} ->
state = mark_connection_succeeded(state)
state = %{state | replication_client_pid: pid, connection_opts: connection_opts}
state = %State{state | replication_client_pid: pid, connection_opts: connection_opts}

if is_nil(state.pool_pid) do
# This is the case where Connection.Manager starts connections from the initial state.
Expand Down Expand Up @@ -346,7 +342,7 @@ defmodule Electric.Connection.Manager do
log_collector_pid = lookup_log_collector_pid(shapes_sup_pid)
Process.monitor(log_collector_pid)

state = %{
state = %State{
state
| pool_pid: pool_pid,
shape_log_collector_pid: log_collector_pid,
Expand All @@ -357,7 +353,7 @@ defmodule Electric.Connection.Manager do
GenServer.reply(awaiting, :ok)
end

{:noreply, %{state | awaiting_active: []}}
{:noreply, %State{state | awaiting_active: []}}

{:error, reason} ->
handle_connection_error(reason, state, "regular")
Expand All @@ -367,11 +363,11 @@ defmodule Electric.Connection.Manager do
@impl true
def handle_info(
{:timeout, tref, step},
%{connection_backoff: %ConnectionBackoff{timer_ref: tref} = conn_backoff} = state
%State{connection_backoff: %{timer_ref: tref} = conn_backoff} = state
) do
state = %State{
state
| connection_backoff: %ConnectionBackoff{conn_backoff | timer_ref: nil}
| connection_backoff: %{conn_backoff | timer_ref: nil}
}

handle_continue(step, state)
Expand All @@ -391,7 +387,7 @@ defmodule Electric.Connection.Manager do
"Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}"
)

state = %{state | replication_client_pid: nil}
state = %State{state | replication_client_pid: nil}
state = schedule_reconnection(:start_replication_client, state)
{:noreply, state}
end
Expand All @@ -408,7 +404,10 @@ defmodule Electric.Connection.Manager do
{:stop, {:shutdown, reason}, state}
end

def handle_info({:DOWN, _ref, :process, pid, reason}, %{shape_log_collector_pid: pid} = state) do
def handle_info(
{:DOWN, _ref, :process, pid, reason},
%State{shape_log_collector_pid: pid} = state
) do
# The replication client would normally exit together with the shape log collector when it
# is blocked on a call to either `ShapeLogCollector.handle_relation_msg/2` or
# `ShapeLogCollector.store_transaction/2` and the log collector encounters a storage error.
Expand All @@ -433,7 +432,7 @@ defmodule Electric.Connection.Manager do
drop_slot(state)
end

{:noreply, %{state | shape_log_collector_pid: nil, replication_client_pid: nil}}
{:noreply, %State{state | shape_log_collector_pid: nil, replication_client_pid: nil}}
end

# Periodically log the status of the lock connection until it is acquired for
Expand All @@ -448,11 +447,11 @@ defmodule Electric.Connection.Manager do
end

@impl true
def handle_cast(:exclusive_connection_lock_acquired, %{pg_lock_acquired: false} = state) do
def handle_cast(:exclusive_connection_lock_acquired, %State{pg_lock_acquired: false} = state) do
# As soon as we acquire the connection lock, we try to start the replication connection
# first because it requires additional privileges compared to regular "pooled" connections,
# so failure to open a replication connection should be reported ASAP.
{:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}}
{:noreply, %State{state | pg_lock_acquired: true}, {:continue, :start_replication_client}}
end

def handle_cast({:pg_info_looked_up, {server_version, system_identifier, timeline_id}}, state) do
Expand All @@ -467,7 +466,7 @@ defmodule Electric.Connection.Manager do
)

{:noreply,
%{
%State{
state
| pg_version: server_version,
pg_system_identifier: system_identifier,
Expand Down Expand Up @@ -561,7 +560,7 @@ defmodule Electric.Connection.Manager do
)

# disable IPv6 and retry immediately
state = %{
state = %State{
state
| ipv6_enabled: false,
connection_opts: connection_opts |> Keyword.put(:ipv6, false) |> update_tcp_opts()
Expand Down Expand Up @@ -656,7 +655,7 @@ defmodule Electric.Connection.Manager do
defp schedule_reconnection(
step,
%State{
connection_backoff: %ConnectionBackoff{
connection_backoff: %{
backoff: backoff,
retries_started_at: retries_started_at
}
Expand All @@ -668,7 +667,7 @@ defmodule Electric.Connection.Manager do

%State{
state
| connection_backoff: %ConnectionBackoff{
| connection_backoff: %{
backoff: backoff,
retries_started_at: retries_started_at || System.monotonic_time(:millisecond),
timer_ref: tref
Expand All @@ -677,29 +676,29 @@ defmodule Electric.Connection.Manager do
end

# If total backoff time is 0 then there were no reconnection attempts
defp mark_connection_succeeded(
%State{connection_backoff: %ConnectionBackoff{retries_started_at: nil}} = state
),
do: state
defp mark_connection_succeeded(%State{connection_backoff: %{retries_started_at: nil}} = state),
do: state

# Otherwise, reset the backoff and total backoff time
defp mark_connection_succeeded(
%State{connection_backoff: %ConnectionBackoff{backoff: backoff}} = state
) do
defp mark_connection_succeeded(%State{connection_backoff: %{backoff: backoff}} = state) do
{_, backoff} = :backoff.succeed(backoff)
Logger.info("Reconnection succeeded after #{inspect(total_retry_time(state))}ms")

%State{
state
| connection_backoff: %ConnectionBackoff{backoff: backoff}
| connection_backoff: %{
state.connection_backoff
| backoff: backoff,
retries_started_at: nil
}
}
end

defp total_retry_time(%State{connection_backoff: %ConnectionBackoff{retries_started_at: nil}}),
defp total_retry_time(%State{connection_backoff: %{retries_started_at: nil}}),
do: 0

defp total_retry_time(%State{
connection_backoff: %ConnectionBackoff{retries_started_at: retries_started_at}
connection_backoff: %{retries_started_at: retries_started_at}
}),
do: retries_started_at - System.monotonic_time(:millisecond)

Expand Down Expand Up @@ -780,11 +779,11 @@ defmodule Electric.Connection.Manager do
log_collector_pid
end

defp drop_slot(%{pool_pid: nil} = _state) do
defp drop_slot(%State{pool_pid: nil} = _state) do
Logger.warning("Skipping slot drop, pool connection not available")
end

defp drop_slot(%{pool_pid: pool} = state) do
defp drop_slot(%State{pool_pid: pool} = state) do
publication_name = Keyword.fetch!(state.replication_opts, :publication_name)
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)
slot_temporary? = Keyword.fetch!(state.replication_opts, :slot_temporary?)
Expand Down
28 changes: 0 additions & 28 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ defmodule Electric.ShapeCacheBehaviour do
@type shape_def :: Shape.t()
@type xmin :: non_neg_integer()

@doc "Update a shape's status with a new log offset"
@callback update_shape_latest_offset(shape_handle(), LogOffset.t(), keyword()) :: :ok

@callback get_shape(shape_def(), opts :: Access.t()) ::
{shape_handle(), current_snapshot_offset :: LogOffset.t()} | nil
@callback get_or_create_shape_handle(shape_def(), opts :: Access.t()) ::
Expand Down Expand Up @@ -113,24 +110,6 @@ defmodule Electric.ShapeCache do
end
end

@impl Electric.ShapeCacheBehaviour
@spec update_shape_latest_offset(shape_handle(), LogOffset.t(), opts :: Access.t()) ::
:ok | {:error, term()}
def update_shape_latest_offset(shape_handle, latest_offset, opts) do
meta_table = get_shape_meta_table(opts)
shape_status = Access.get(opts, :shape_status, ShapeStatus)

if shape_status.set_latest_offset(meta_table, shape_handle, latest_offset) do
:ok
else
Logger.warning(
"Tried to update latest offset for shape #{shape_handle} which doesn't exist"
)

:error
end
end

@impl Electric.ShapeCacheBehaviour
@spec list_shapes(Access.t()) :: [{shape_handle(), Shape.t()}]
def list_shapes(opts) do
Expand Down Expand Up @@ -348,13 +327,6 @@ defmodule Electric.ShapeCache do
publication_manager: state.publication_manager,
chunk_bytes_threshold: state.chunk_bytes_threshold,
log_producer: state.log_producer,
shape_cache:
{__MODULE__,
%{
server: state.name,
shape_meta_table: state.shape_meta_table,
stack_id: state.stack_id
}},
registry: state.registry,
db_pool: state.db_pool,
run_with_conn_fn: state.run_with_conn_fn,
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ defmodule Electric.Shapes.Consumer do
shape_handle: shape_handle,
log_state: log_state,
chunk_bytes_threshold: chunk_bytes_threshold,
shape_cache: {shape_cache, shape_cache_opts},
shape_status: {shape_status, shape_status_state},
registry: registry,
storage: storage
} = state
Expand Down Expand Up @@ -326,7 +326,7 @@ defmodule Electric.Shapes.Consumer do
Map.new(shape_attrs(state.shape_handle, state.shape))
)

shape_cache.update_shape_latest_offset(shape_handle, last_log_offset, shape_cache_opts)
shape_status.set_latest_offset(shape_status_state, shape_handle, last_log_offset)

notify_listeners(registry, :new_changes, shape_handle, last_log_offset)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ defmodule Electric.Shapes.ConsumerSupervisor do
shape: [type: {:struct, Electric.Shapes.Shape}, required: true],
inspector: [type: :mod_arg, required: true],
log_producer: [type: @genserver_name_schema, required: true],
shape_cache: [type: :mod_arg, required: true],
registry: [type: :atom, required: true],
shape_status: [type: :mod_arg, required: true],
storage: [type: :mod_arg, required: true],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
@test_pg_id "12345"

# Higher timeout is needed for some tests that tend to run slower on CI.
@receive_timeout 1000
@receive_timeout 2000

def load_column_info({"public", "users"}, _),
do: {:ok, @test_shape.table_info[{"public", "users"}][:columns]}
Expand Down Expand Up @@ -780,7 +780,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"}
end

@tag stack_ready_timeout: 1000
@tag stack_ready_timeout: 5000
test "waits until stack ready and proceeds", ctx do
conn_task =
Task.async(fn ->
Expand Down
47 changes: 0 additions & 47 deletions packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -328,53 +328,6 @@ defmodule Electric.ShapeCacheTest do
} = map
end

test "updates latest offset correctly", %{
shape_cache_opts: opts,
storage: storage
} do
{shape_handle, initial_offset} = ShapeCache.get_or_create_shape_handle(@shape, opts)
assert :started = ShapeCache.await_snapshot_start(shape_handle, opts)

assert {^shape_handle, offset_after_snapshot} =
ShapeCache.get_or_create_shape_handle(@shape, opts)

expected_offset_after_log_entry =
LogOffset.new(Electric.Postgres.Lsn.from_integer(1000), 0)

:ok =
ShapeCache.update_shape_latest_offset(shape_handle, expected_offset_after_log_entry, opts)

assert {^shape_handle, offset_after_log_entry} =
ShapeCache.get_or_create_shape_handle(@shape, opts)

assert initial_offset == @zero_offset
assert initial_offset == offset_after_snapshot
assert LogOffset.compare(offset_after_log_entry, offset_after_snapshot) == :gt
assert offset_after_log_entry == expected_offset_after_log_entry

# Stop snapshot process gracefully to prevent errors being logged in the test
storage = Storage.for_shape(shape_handle, storage)

stream =
Storage.get_log_stream(
LogOffset.before_all(),
LogOffset.last_before_real_offsets(),
storage
)

Stream.run(stream)
end

test "errors if appending to untracked shape_handle", %{shape_cache_opts: opts} do
shape_handle = "foo"
log_offset = LogOffset.new(1000, 0)

{:error, log} =
with_log(fn -> ShapeCache.update_shape_latest_offset(shape_handle, log_offset, opts) end)

assert log =~ "Tried to update latest offset for shape #{shape_handle} which doesn't exist"
end

test "correctly propagates the error", %{shape_cache_opts: opts} do
shape = %Shape{
@shape
Expand Down
Loading

0 comments on commit eaf0c29

Please sign in to comment.