Skip to content

Commit

Permalink
offset-ordered compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter committed Jan 16, 2025
1 parent af4208a commit aa8e1a7
Show file tree
Hide file tree
Showing 13 changed files with 1,119 additions and 20 deletions.
9 changes: 9 additions & 0 deletions packages/sync-service/lib/electric/log_items.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,13 @@ defmodule Electric.LogItems do
|> Enum.zip()
|> Map.new()
end

def merge_updates(u1, u2) do
%{
"key" => u1["key"],
"offset" => u2["offset"],
"headers" => Map.take(u1["headers"], ["operation", "relation"]),
"value" => Map.merge(u1["value"], u2["value"])
}
end
end
80 changes: 72 additions & 8 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Electric.ShapeCache.FileStorage do
use Retry
require Logger

alias Electric.ShapeCache.FileStorage.OnDisk
alias Electric.Telemetry.OpenTelemetry
alias Electric.Replication.LogOffset
import Electric.Replication.LogOffset, only: :macros
Expand All @@ -17,6 +18,7 @@ defmodule Electric.ShapeCache.FileStorage do
@xmin_key :snapshot_xmin
@snapshot_meta_key :snapshot_meta
@snapshot_started_key :snapshot_started
@compaction_info_key :compaction_info

@behaviour Electric.ShapeCache.Storage

Expand Down Expand Up @@ -324,7 +326,7 @@ defmodule Electric.ShapeCache.FileStorage do
log_items
|> Enum.map(fn
{:chunk_boundary, offset} -> {chunk_checkpoint_key(offset), nil}
{offset, json_log_item} -> {log_key(offset), json_log_item}
{offset, key, type, json_log_item} -> {log_key(offset), {key, type, json_log_item}}
end)
|> then(&CubDB.put_multi(opts.db, &1))
else
Expand Down Expand Up @@ -371,6 +373,62 @@ defmodule Electric.ShapeCache.FileStorage do
def get_log_stream(%LogOffset{} = offset, max_offset, %FS{} = opts),
do: stream_log_chunk(offset, max_offset, opts)

def compact(%FS{} = opts) do
CubDB.select(opts.db,
min_key: chunk_checkpoint_start(),
max_key: chunk_checkpoint_end(),
reverse: true
)
# Keep the last 2 chunks as-is just in case
|> Enum.take(3)
|> case do
[_, _, {key, _}] ->
compact(opts, offset(key))

_ ->
# Not enough chunks to warrant compaction
:ok
end
end

def compact(%FS{} = opts, %LogOffset{} = upper_bound) do
log_file_path = opts.data_dir <> "/compact_log.electric"

CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end)
|> OnDisk.write_log_file(log_file_path)

key_index_path = OnDisk.create_sorted_key_index(log_file_path)
action_file_path = OnDisk.create_action_file(log_file_path, key_index_path)

OnDisk.apply_actions(log_file_path, action_file_path)

CubDB.put(opts.db, @compaction_info_key, {log_file_path, upper_bound})

compacted_chunks =
CubDB.select(opts.db,
min_key: chunk_checkpoint_start(),
max_key: chunk_checkpoint_key(upper_bound),
max_key_inclusive: true
)
|> Enum.map(fn {key, _} -> key end)

compacted_logs =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound)
)
|> Enum.map(fn {key, _} -> key end)

CubDB.delete_multi(opts.db, compacted_chunks ++ compacted_logs)

:ok
end

# This function raises if the chunk file doesn't exist.
defp stream_snapshot_chunk!(%FS{} = opts, chunk_number) do
Stream.resource(
Expand Down Expand Up @@ -427,13 +485,19 @@ defmodule Electric.ShapeCache.FileStorage do
end

defp stream_log_chunk(%LogOffset{} = offset, max_offset, %FS{} = opts) do
opts.db
|> CubDB.select(
min_key: log_key(offset),
max_key: log_key(max_offset),
min_key_inclusive: false
)
|> Stream.map(fn {_, item} -> item end)
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {log_file_path, upper_bound}} when is_log_offset_lt(offset, upper_bound) ->
OnDisk.read_json_chunk(log_file_path, offset)

_ ->
opts.db
|> CubDB.select(
min_key: log_key(offset),
max_key: log_key(max_offset),
min_key_inclusive: false
)
|> Stream.map(fn {_, {_, _, json_log_item}} -> json_log_item end)
end
end

defp wait_for_chunk_file_or_snapshot_end(
Expand Down
Loading

0 comments on commit aa8e1a7

Please sign in to comment.