Skip to content

Commit

Permalink
feat: add info about the last-processed lsn to up-to-date message (#2307
Browse files Browse the repository at this point in the history
)

Closes #2215
  • Loading branch information
icehaunter authored Feb 6, 2025
1 parent 519fc8a commit b84cd5c
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-emus-grow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Expose globally last-seen LSN on up-to-date messages
12 changes: 7 additions & 5 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ defmodule Electric.Application do

storage = Electric.Config.get_env(:storage)

{kv_module, kv_fun, kv_params} =
Electric.Config.get_env(:persistent_kv)

persistent_kv = apply(kv_module, kv_fun, [kv_params])

router_opts =
Electric.Shapes.Api.plug_opts(
[
Expand All @@ -34,14 +39,11 @@ defmodule Electric.Application do
Electric.StackSupervisor.build_shared_opts(
stack_id: stack_id,
stack_events_registry: Registry.StackEvents,
storage: storage
storage: storage,
persistent_kv: persistent_kv
)
)

{kv_module, kv_fun, kv_params} =
Electric.Config.get_env(:persistent_kv)

persistent_kv = apply(kv_module, kv_fun, [kv_params])
replication_stream_id = Electric.Config.get_env(:replication_stream_id)
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"
Expand Down
5 changes: 4 additions & 1 deletion packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ defmodule Electric.Connection.Manager do
# Registry used for stack events
:stack_events_registry,
:tweaks,
:persistent_kv,
:ipv6_enabled,
awaiting_active: [],
drop_slot_requested: false,
Expand Down Expand Up @@ -190,6 +191,7 @@ defmodule Electric.Connection.Manager do
stack_id: Keyword.fetch!(opts, :stack_id),
stack_events_registry: Keyword.fetch!(opts, :stack_events_registry),
tweaks: Keyword.fetch!(opts, :tweaks),
persistent_kv: Keyword.fetch!(opts, :persistent_kv),
ipv6_enabled: connection_opts[:ipv6]
}

Expand Down Expand Up @@ -328,7 +330,8 @@ defmodule Electric.Connection.Manager do
pool_opts: state.pool_opts,
replication_opts: state.replication_opts,
stack_events_registry: state.stack_events_registry,
tweaks: state.tweaks
tweaks: state.tweaks,
persistent_kv: state.persistent_kv
)

# Everything is ready to start accepting and processing logical messages from Postgres.
Expand Down
4 changes: 3 additions & 1 deletion packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ defmodule Electric.Connection.Supervisor do
db_pool_opts = Keyword.fetch!(opts, :pool_opts)
replication_opts = Keyword.fetch!(opts, :replication_opts)
inspector = Keyword.fetch!(shape_cache_opts, :inspector)
persistent_kv = Keyword.fetch!(opts, :persistent_kv)

shape_cache_spec = {Electric.ShapeCache, shape_cache_opts}

Expand All @@ -51,7 +52,8 @@ defmodule Electric.Connection.Supervisor do
db_pool: Keyword.fetch!(db_pool_opts, :name)}

shape_log_collector_spec =
{Electric.Replication.ShapeLogCollector, stack_id: stack_id, inspector: inspector}
{Electric.Replication.ShapeLogCollector,
stack_id: stack_id, inspector: inspector, persistent_kv: persistent_kv}

child_spec =
Supervisor.child_spec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Electric.Replication.ShapeLogCollector do
"""
use GenStage

alias Electric.PersistentKV
alias Electric.Postgres
alias Electric.Postgres.Inspector
alias Electric.Replication.Changes
alias Electric.Replication.Changes.{Relation, Transaction}
Expand All @@ -15,6 +17,7 @@ defmodule Electric.Replication.ShapeLogCollector do
@schema NimbleOptions.new!(
stack_id: [type: :string, required: true],
inspector: [type: :mod_arg, required: true],
persistent_kv: [type: :any, required: true],
# see https://hexdocs.pm/gen_stage/GenStage.html#c:init/1-options
demand: [type: {:in, [:forward, :accumulate]}, default: :accumulate]
)
Expand Down Expand Up @@ -57,7 +60,20 @@ defmodule Electric.Replication.ShapeLogCollector do
Process.set_label({:shape_log_collector, opts.stack_id})
Logger.metadata(stack_id: opts.stack_id)
Electric.Telemetry.Sentry.set_tags_context(stack_id: opts.stack_id)
state = Map.merge(opts, %{producer: nil, subscriptions: {0, MapSet.new()}})

last_seen_lsn =
case PersistentKV.get(opts.persistent_kv, "#{opts.stack_id}:last_processed_lsn") do
{:ok, last_seen_lsn} -> last_seen_lsn
{:error, :not_found} -> 0
end

state =
Map.merge(opts, %{
producer: nil,
subscriptions: {0, MapSet.new()},
last_seen_lsn: last_seen_lsn
})

# start in demand: :accumulate mode so that the ShapeCache is able to start
# all active consumers before we start sending transactions
{:producer, state,
Expand Down Expand Up @@ -85,7 +101,7 @@ defmodule Electric.Replication.ShapeLogCollector do
# client.
def handle_demand(_demand, %{producer: producer} = state) do
GenServer.reply(producer, :ok)
{:noreply, [], %{state | producer: nil}}
{:noreply, [], update_last_processed_lsn(%{state | producer: nil})}
end

def handle_cancel({:cancel, _}, from, state) do
Expand Down Expand Up @@ -124,6 +140,11 @@ defmodule Electric.Replication.ShapeLogCollector do

OpenTelemetry.add_span_attributes("txn.is_dropped": true)

state =
state
|> put_last_seen_lsn(txn.lsn)
|> update_last_processed_lsn()

{:reply, :ok, [], state}
end

Expand All @@ -144,7 +165,7 @@ defmodule Electric.Replication.ShapeLogCollector do

# we don't reply to this call. we only reply when we receive demand from
# the consumers, signifying that every one has processed this txn
{:noreply, [txn], %{state | producer: from}}
{:noreply, [txn], %{state | producer: from} |> put_last_seen_lsn(txn.lsn)}
end

defp handle_relation(rel, _from, %{subscriptions: {0, _}} = state) do
Expand Down Expand Up @@ -184,4 +205,21 @@ defmodule Electric.Replication.ShapeLogCollector do

state
end

defp put_last_seen_lsn(state, lsn) do
%{state | last_seen_lsn: max(state.last_seen_lsn, Postgres.Lsn.to_integer(lsn))}
end

defp update_last_processed_lsn(state) do
# state = %{state | last_processed_lsn: state.last_seen_lsn}
Logger.debug("Updating last processed lsn to #{state.last_seen_lsn}")

PersistentKV.set(
state.persistent_kv,
"#{state.stack_id}:last_processed_lsn",
state.last_seen_lsn
)

state
end
end
48 changes: 40 additions & 8 deletions packages/sync-service/lib/electric/shapes/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Electric.Shapes.Api do
:inspector,
:pg_id,
:registry,
:persistent_kv,
:shape_cache,
:stack_events_registry,
:stack_id,
Expand All @@ -34,9 +35,6 @@ defmodule Electric.Shapes.Api do

# Aliasing for pattern matching
@before_all_offset LogOffset.before_all()

@up_to_date %{headers: %{control: "up-to-date"}}
@up_to_date_json Jason.encode!(@up_to_date)
@offset_out_of_bounds %{offset: ["out of bounds for this shape"]}
@must_refetch [%{headers: %{control: "must-refetch"}}]

Expand Down Expand Up @@ -338,7 +336,20 @@ defmodule Electric.Shapes.Api do
|> update_attrs(%{ot_is_immediate_response: false})
|> hold_until_change()
else
body = Stream.concat([log, maybe_up_to_date(request)])
global_last_seen_lsn = get_global_last_seen_lsn(request)

up_to_date_lsn =
if live? do
# In live mode, if we've gotten an actual update and are here and not in `empty_response`,
# then for this shape and this request we trust the locally last seen LSN.
chunk_end_offset.tx_offset
else
# In non-live mode, we're reading from disk. We trust the global max because it's updated
# after all disk writes. We take the max because we might be reading from disk before a global update.
max(global_last_seen_lsn, chunk_end_offset.tx_offset)
end

body = Stream.concat([log, maybe_up_to_date(request, up_to_date_lsn)])

%{response | chunked: true, body: encode_log(request, body)}
end
Expand Down Expand Up @@ -390,7 +401,24 @@ defmodule Electric.Shapes.Api do
defp empty_response(%Request{} = request) do
%{response: response} = update_attrs(request, %{ot_is_empty_response: true})

%{response | status: 204, body: encode_log(request, [@up_to_date])}
%{
response
| status: 204,
body: encode_log(request, [up_to_date_ctl(get_global_last_seen_lsn(request))])
}
end

defp get_global_last_seen_lsn(%Request{} = request) do
case Electric.PersistentKV.get(
request.api.persistent_kv,
"#{request.api.stack_id}:last_processed_lsn"
) do
{:ok, up_to_date_lsn} ->
up_to_date_lsn

{:error, :not_found} ->
0
end
end

defp update_attrs(%Request{} = request, attrs) do
Expand All @@ -399,14 +427,18 @@ defmodule Electric.Shapes.Api do
end)
end

defp maybe_up_to_date(%Request{response: %{up_to_date: true}}) do
[@up_to_date_json]
defp maybe_up_to_date(%Request{response: %{up_to_date: true}}, up_to_date_lsn) do
[up_to_date_ctl(up_to_date_lsn)]
end

defp maybe_up_to_date(%Request{response: %{up_to_date: false}}) do
defp maybe_up_to_date(%Request{response: %{up_to_date: false}}, _) do
[]
end

defp up_to_date_ctl(up_to_date_lsn) do
%{headers: %{control: "up-to-date", global_last_seen_lsn: up_to_date_lsn}}
end

defp with_span(%Request{} = request, name, attributes \\ [], fun) do
OpenTelemetry.with_span(name, attributes, stack_id(request), fun)
end
Expand Down
6 changes: 6 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ defmodule Electric.StackSupervisor do
server: Electric.Postgres.Inspector.EtsInspector.name(stack_id: stack_id)}
)

persistent_kv = Access.fetch!(opts, :persistent_kv)

[
shape_cache: shape_cache,
publication_manager: publication_manager,
Expand All @@ -168,6 +170,7 @@ defmodule Electric.StackSupervisor do
storage: storage_mod_arg(opts),
inspector: inspector,
stack_id: stack_id,
persistent_kv: persistent_kv,
get_service_status: fn -> Electric.ServiceStatus.check(stack_id) end
]
end
Expand Down Expand Up @@ -214,6 +217,8 @@ defmodule Electric.StackSupervisor do

shape_changes_registry_name = registry_name(stack_id)

:ets.new(:"#{stack_id}:stack_meta", [:set, :public, :named_table])

shape_cache_opts = [
stack_id: stack_id,
storage: storage,
Expand Down Expand Up @@ -247,6 +252,7 @@ defmodule Electric.StackSupervisor do
stack_id: stack_id,
persistent_kv: config.persistent_kv
],
persistent_kv: config.persistent_kv,
shape_cache_opts: shape_cache_opts,
tweaks: config.tweaks
]
Expand Down
Loading

0 comments on commit b84cd5c

Please sign in to comment.