Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: adds compaction #2231

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/sync-service/lib/electric/log_items.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,13 @@ defmodule Electric.LogItems do
|> Enum.zip()
|> Map.new()
end

def merge_updates(u1, u2) do
%{
"key" => u1["key"],
"offset" => u2["offset"],
"headers" => Map.take(u1["headers"], ["operation", "relation"]),
"value" => Map.merge(u1["value"], u2["value"])
}
end
end
156 changes: 137 additions & 19 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Electric.ShapeCache.FileStorage do
use Retry
require Logger

alias Electric.ShapeCache.LogChunker
alias Electric.Telemetry.OpenTelemetry
alias Electric.Replication.LogOffset
import Electric.Replication.LogOffset, only: :macros
Expand All @@ -17,6 +18,7 @@ defmodule Electric.ShapeCache.FileStorage do
@xmin_key :snapshot_xmin
@snapshot_meta_key :snapshot_meta
@snapshot_started_key :snapshot_started
@compaction_info_key :compaction_info

@behaviour Electric.ShapeCache.Storage

Expand All @@ -27,8 +29,10 @@ defmodule Electric.ShapeCache.FileStorage do
:data_dir,
:cubdb_dir,
:snapshot_dir,
:log_dir,
:stack_id,
:extra_opts,
:chunk_bytes_threshold,
version: @version
]

Expand All @@ -38,7 +42,12 @@ defmodule Electric.ShapeCache.FileStorage do
storage_dir = Keyword.get(opts, :storage_dir, "./shapes")

# Always scope the provided storage dir by stack id
%{base_path: Path.join(storage_dir, stack_id), stack_id: stack_id}
%{
base_path: Path.join(storage_dir, stack_id),
stack_id: stack_id,
chunk_bytes_threshold:
Keyword.get(opts, :chunk_bytes_threshold, LogChunker.default_chunk_size_threshold())
}
end

@impl Electric.ShapeCache.Storage
Expand All @@ -59,8 +68,10 @@ defmodule Electric.ShapeCache.FileStorage do
data_dir: data_dir,
cubdb_dir: Path.join([data_dir, "cubdb"]),
snapshot_dir: Path.join([data_dir, "snapshots"]),
log_dir: Path.join([data_dir, "log"]),
stack_id: stack_id,
extra_opts: Map.get(opts, :extra_opts, %{})
extra_opts: Map.get(opts, :extra_opts, %{}),
chunk_bytes_threshold: opts.chunk_bytes_threshold
}
end

Expand All @@ -87,7 +98,8 @@ defmodule Electric.ShapeCache.FileStorage do
defp initialise_filesystem(opts) do
with :ok <- File.mkdir_p(opts.data_dir),
:ok <- File.mkdir_p(opts.cubdb_dir),
:ok <- File.mkdir_p(opts.snapshot_dir) do
:ok <- File.mkdir_p(opts.snapshot_dir),
:ok <- File.mkdir_p(opts.log_dir) do
:ok
end
end
Expand Down Expand Up @@ -324,7 +336,7 @@ defmodule Electric.ShapeCache.FileStorage do
log_items
|> Enum.map(fn
{:chunk_boundary, offset} -> {chunk_checkpoint_key(offset), nil}
{offset, json_log_item} -> {log_key(offset), json_log_item}
{offset, key, type, json_log_item} -> {log_key(offset), {key, type, json_log_item}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type is operation, right? Please could it be renamed as such

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the various other places in lib and test that type is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also accept op_type as you've used elsewhere

end)
|> then(&CubDB.put_multi(opts.db, &1))
else
Expand Down Expand Up @@ -371,6 +383,98 @@ defmodule Electric.ShapeCache.FileStorage do
def get_log_stream(%LogOffset{} = offset, max_offset, %FS{} = opts),
do: stream_log_chunk(offset, max_offset, opts)

def compact(%FS{} = opts) do
CubDB.select(opts.db,
min_key: chunk_checkpoint_start(),
max_key: chunk_checkpoint_end(),
reverse: true
)
# Keep the last 2 chunks as-is just in case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of what?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BC we don't want to lose idempotency of writes from postgres to the same txn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vibes, mostly. I'm assuming that over 20MB of data we're far enough from the tip where we don't care about this property (because it only happens when PG connection drops at an unfortunate moment where we've written the log but didn't ack. So only a few transactions at the tip can be affected. 20MB seems like a reasonable buffer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently we ack after every transaction, right? You could have a transaction that was over 20MB. Seems a likely time electric might crash too (but then succeed on restart). Could this result in duplicate entries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so that's 3 transactions over 10 MB (to make separate chunks) coming in, then the compaction triggers, and we start 3 txns back? I assume possible, just not likely. Since we're tracking the compaction boundary in CubDB we can check it before writing the incoming transactions and just discard those that are definitely in the past

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. So you're saying transactions cannot span chunks. In which case as we ack after every transaction I think we're safe, right? Or is these some scenario where we could start 3 txns back?

More broadly I'm making a few points:

  • this explanation for why we keep the last two chunks (much of the detail of this thread) should be in the comment
  • how we ack may change in the future making this more of an issue, so it would be good to guard against it
  • I don't like dismissing issues as unlikely. I've seen plenty of "unlikely" bugs occur :) We should put a failsafe in even that failsafe is log and crash horribly (your "discard those that are definitely in the past" idea seems a much nicer failsafe!)

|> Enum.take(3)
|> case do
[_, _, {key, _}] ->
compact(opts, offset(key))

_ ->
# Not enough chunks to warrant compaction
:ok
end
end

def compact(%FS{} = opts, %LogOffset{} = upper_bound) do
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {_, ^upper_bound}} ->
:ok

{:ok, {old_log, _}} ->
# compact further
new_log_file_path =
Path.join(
opts.log_dir,
"compact_log_#{DateTime.utc_now() |> DateTime.to_unix(:millisecond)}.electric"
)

new_log =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end)
|> FS.LogFile.write_log_file(new_log_file_path <> ".new")

merged_log =
FS.Compaction.merge_and_compact(
old_log,
new_log,
new_log_file_path,
opts.chunk_bytes_threshold
)

CubDB.put(opts.db, @compaction_info_key, {merged_log, upper_bound})
delete_compacted_keys(opts, upper_bound)
FS.Compaction.rm_log(new_log)
FS.Compaction.rm_log(old_log)
:ok

:error ->
log_file_path = Path.join(opts.log_dir, "compact_log.electric")

log =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end)
|> FS.LogFile.write_log_file(log_file_path)
|> FS.Compaction.compact_in_place(opts.chunk_bytes_threshold)

CubDB.put(opts.db, @compaction_info_key, {log, upper_bound})
delete_compacted_keys(opts, upper_bound)
:ok
end
end

defp delete_compacted_keys(%FS{} = opts, upper_bound) do
compacted_chunks =
CubDB.select(opts.db,
min_key: chunk_checkpoint_start(),
max_key: chunk_checkpoint_key(upper_bound),
max_key_inclusive: true
)
|> Enum.map(fn {key, _} -> key end)

compacted_logs =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound)
)
|> Enum.map(fn {key, _} -> key end)

CubDB.delete_multi(opts.db, compacted_chunks ++ compacted_logs)
end

# This function raises if the chunk file doesn't exist.
defp stream_snapshot_chunk!(%FS{} = opts, chunk_number) do
Stream.resource(
Expand Down Expand Up @@ -427,13 +531,20 @@ defmodule Electric.ShapeCache.FileStorage do
end

defp stream_log_chunk(%LogOffset{} = offset, max_offset, %FS{} = opts) do
opts.db
|> CubDB.select(
min_key: log_key(offset),
max_key: log_key(max_offset),
min_key_inclusive: false
)
|> Stream.map(fn {_, item} -> item end)
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) ->
FS.ChunkIndex.fetch_chunk(elem(log, 1), offset)
FS.LogFile.read_chunk(log, offset)

_ ->
opts.db
|> CubDB.select(
min_key: log_key(offset),
max_key: log_key(max_offset),
min_key_inclusive: false
)
|> Stream.map(fn {_, {_, _, json_log_item}} -> json_log_item end)
end
end

defp wait_for_chunk_file_or_snapshot_end(
Expand Down Expand Up @@ -495,14 +606,21 @@ defmodule Electric.ShapeCache.FileStorage do
def get_chunk_end_log_offset(offset, %FS{} = opts), do: get_chunk_end_for_log(offset, opts)

defp get_chunk_end_for_log(offset, %FS{} = opts) do
CubDB.select(opts.db,
min_key: chunk_checkpoint_key(offset),
max_key: chunk_checkpoint_end(),
min_key_inclusive: false
)
|> Stream.map(fn {key, _} -> offset(key) end)
|> Enum.take(1)
|> Enum.at(0)
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) ->
{:ok, max_offset, _} = FS.ChunkIndex.fetch_chunk(elem(log, 1), offset)
max_offset

:error ->
CubDB.select(opts.db,
min_key: chunk_checkpoint_key(offset),
max_key: chunk_checkpoint_end(),
min_key_inclusive: false
)
|> Stream.map(fn {key, _} -> offset(key) end)
|> Enum.take(1)
|> Enum.at(0)
end
end

defp get_last_snapshot_offset(%FS{} = opts) do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
defmodule Electric.ShapeCache.FileStorage.ActionFile do
@moduledoc false
alias Electric.Utils
alias Electric.ShapeCache.FileStorage.LogFile
alias Electric.ShapeCache.FileStorage.KeyIndex
import KeyIndex, only: :macros

@doc """
Convert a sorted key index to a sorted action file.

Action file is line-for-line mapping of log file offsets to actions of "keep", "skip" or "compact".
It's ordering should be the same as the log file to allow for sequential reads of both.

For "keep" lines, we keep the original, for "skip" lines, we skip the original, and for "compact" lines,
we read all specified JSONs from the log file and merge them into one. Multiple updates to the the same
key are mapped to be "skipped" for all but the last one, which is then mapped to "compact"

Action file format is, in elixir binary:

<<operation_offset::128, operation_type::binary>>

Where `operation_type` is one of:

<<?k::8>> #- Keep
<<?s::8>> #- Skip
<<?c::8, json_offsets_count::16, json_offsets::binary>> #- Compact

And `json_offsets` is `json_offsets_count` of `<<json_start_position::64, json_size::64>>`
"""
def create_from_key_index(key_index_path, action_file_path) do
KeyIndex.stream(key_index_path)
|> Stream.chunk_by(&key_index_item(&1, :key))
|> Stream.flat_map(fn chunk ->
# Chunk contains all operations for a given key in order

chunk
|> Enum.chunk_by(&key_index_item(&1, :op_type))
|> Enum.flat_map(fn
# Keep any single operation, since inserts/deletes won't be duplicated, and one update can't be compacted
[key_index_item(offset: offset)] -> [<<LogFile.offset(offset)::binary, ?k::8>>]
# If more than one, then it's definitely an update
updates -> updates_to_actions(updates)
end)
end)
|> Stream.into(File.stream!(action_file_path))
|> Stream.run()

Utils.external_merge_sort(action_file_path, &stream_for_sorting/1)
end

@doc """
Read the action file and return a stream of tuples `{offset, action}`.
"""
@spec stream(path :: String.t()) ::
Enumerable.t(
{LogFile.offset(),
:keep | :skip | {:compact, [{non_neg_integer(), non_neg_integer()}, ...]}}
)
def stream(action_file_path) do
Stream.resource(
fn -> File.open!(action_file_path, [:read, :raw, :read_ahead]) end,
fn file ->
case IO.binread(file, 17) do
:eof ->
{:halt, file}

<<tx_offset::64, op_offset::64, ?c::8>> ->
<<count::16>> = IO.binread(file, 2)
offsets = for <<pos::64, size::64 <- IO.binread(file, 16 * count)>>, do: {pos, size}
{[{{tx_offset, op_offset}, {:compact, offsets}}], file}

<<tx_offset::64, op_offset::64, ?k::8>> ->
{[{{tx_offset, op_offset}, :keep}], file}

<<tx_offset::64, op_offset::64, ?s::8>> ->
{[{{tx_offset, op_offset}, :skip}], file}
end
end,
&File.close/1
)
end

# acc format: {positions_len, positions, actions}
defp updates_to_actions(updates, acc \\ {0, [], []})
# We don't care about order being reversed because it's going to be sorted.
defp updates_to_actions([], {_, _, acc}), do: acc

# The compaction target is either last one, or after we hit 65535 updates. Technically makes it suboptimal,
# but saves us a lot of memory because the position list will take up at most 65535 * 16 = 1048560 bytes ~ 1MB of memory,
# as opposed to 65536MB if we allow int32 positions.
defp updates_to_actions(
[key_index_item(offset: offset, json: last) | rest],
{total_positions, positions, actions}
)
when rest == []
when total_positions > 65534 do
actions =
[
[
<<LogFile.offset(offset)::binary, ?c::8, length(positions) + 1::16>>,
Utils.list_reverse_map([last | positions], fn {pos, size} -> <<pos::64, size::64>> end)
]
| actions
]

updates_to_actions(rest, {0, [], actions})
end

defp updates_to_actions(
[key_index_item(offset: offset, json: position) | rest],
{total_positions, all_positions, actions}
) do
updates_to_actions(
rest,
{total_positions + 1, [position | all_positions],
[[<<LogFile.offset(offset)::binary, ?s::8>>] | actions]}
)
end

@spec stream_for_sorting(String.t()) ::
Enumerable.t(Utils.sortable_binary({non_neg_integer(), non_neg_integer()}))
defp stream_for_sorting(action_file_path) do
Stream.resource(
fn -> File.open!(action_file_path, [:read, :raw, :read_ahead]) end,
fn file ->
case IO.binread(file, 17) do
:eof ->
{:halt, file}

<<tx_offset::64, op_offset::64, ?c::8>> = line ->
<<count::16>> = IO.binread(file, 2)

{[{{tx_offset, op_offset}, line <> <<count::16>> <> IO.binread(file, count * 16)}],
file}

<<tx_offset::64, op_offset::64, _::8>> = line ->
{[{{tx_offset, op_offset}, line}], file}
end
end,
&File.close/1
)
end
end
Loading
Loading