-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: adds compaction #2231
base: main
Are you sure you want to change the base?
feat!: adds compaction #2231
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ defmodule Electric.ShapeCache.FileStorage do | |
use Retry | ||
require Logger | ||
|
||
alias Electric.ShapeCache.LogChunker | ||
alias Electric.Telemetry.OpenTelemetry | ||
alias Electric.Replication.LogOffset | ||
import Electric.Replication.LogOffset, only: :macros | ||
|
@@ -17,6 +18,7 @@ defmodule Electric.ShapeCache.FileStorage do | |
@xmin_key :snapshot_xmin | ||
@snapshot_meta_key :snapshot_meta | ||
@snapshot_started_key :snapshot_started | ||
@compaction_info_key :compaction_info | ||
|
||
@behaviour Electric.ShapeCache.Storage | ||
|
||
|
@@ -27,8 +29,10 @@ defmodule Electric.ShapeCache.FileStorage do | |
:data_dir, | ||
:cubdb_dir, | ||
:snapshot_dir, | ||
:log_dir, | ||
:stack_id, | ||
:extra_opts, | ||
:chunk_bytes_threshold, | ||
version: @version | ||
] | ||
|
||
|
@@ -38,7 +42,12 @@ defmodule Electric.ShapeCache.FileStorage do | |
storage_dir = Keyword.get(opts, :storage_dir, "./shapes") | ||
|
||
# Always scope the provided storage dir by stack id | ||
%{base_path: Path.join(storage_dir, stack_id), stack_id: stack_id} | ||
%{ | ||
base_path: Path.join(storage_dir, stack_id), | ||
stack_id: stack_id, | ||
chunk_bytes_threshold: | ||
Keyword.get(opts, :chunk_bytes_threshold, LogChunker.default_chunk_size_threshold()) | ||
} | ||
end | ||
|
||
@impl Electric.ShapeCache.Storage | ||
|
@@ -59,8 +68,10 @@ defmodule Electric.ShapeCache.FileStorage do | |
data_dir: data_dir, | ||
cubdb_dir: Path.join([data_dir, "cubdb"]), | ||
snapshot_dir: Path.join([data_dir, "snapshots"]), | ||
log_dir: Path.join([data_dir, "log"]), | ||
stack_id: stack_id, | ||
extra_opts: Map.get(opts, :extra_opts, %{}) | ||
extra_opts: Map.get(opts, :extra_opts, %{}), | ||
chunk_bytes_threshold: opts.chunk_bytes_threshold | ||
} | ||
end | ||
|
||
|
@@ -87,7 +98,8 @@ defmodule Electric.ShapeCache.FileStorage do | |
defp initialise_filesystem(opts) do | ||
with :ok <- File.mkdir_p(opts.data_dir), | ||
:ok <- File.mkdir_p(opts.cubdb_dir), | ||
:ok <- File.mkdir_p(opts.snapshot_dir) do | ||
:ok <- File.mkdir_p(opts.snapshot_dir), | ||
:ok <- File.mkdir_p(opts.log_dir) do | ||
:ok | ||
end | ||
end | ||
|
@@ -324,7 +336,7 @@ defmodule Electric.ShapeCache.FileStorage do | |
log_items | ||
|> Enum.map(fn | ||
{:chunk_boundary, offset} -> {chunk_checkpoint_key(offset), nil} | ||
{offset, json_log_item} -> {log_key(offset), json_log_item} | ||
{offset, key, type, json_log_item} -> {log_key(offset), {key, type, json_log_item}} | ||
end) | ||
|> then(&CubDB.put_multi(opts.db, &1)) | ||
else | ||
|
@@ -371,6 +383,98 @@ defmodule Electric.ShapeCache.FileStorage do | |
def get_log_stream(%LogOffset{} = offset, max_offset, %FS{} = opts), | ||
do: stream_log_chunk(offset, max_offset, opts) | ||
|
||
def compact(%FS{} = opts) do | ||
CubDB.select(opts.db, | ||
min_key: chunk_checkpoint_start(), | ||
max_key: chunk_checkpoint_end(), | ||
reverse: true | ||
) | ||
# Keep the last 2 chunks as-is just in case | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case of what? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BC we don't want to lose idempotency of writes from postgres to the same txn There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So why 2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Vibes, mostly. I'm assuming that over 20MB of data we're far enough from the tip where we don't care about this property (because it only happens when PG connection drops at an unfortunate moment where we've written the log but didn't ack. So only a few transactions at the tip can be affected. 20MB seems like a reasonable buffer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So currently we ack after every transaction, right? You could have a transaction that was over 20MB. Seems a likely time electric might crash too (but then succeed on restart). Could this result in duplicate entries? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so that's 3 transactions over 10 MB (to make separate chunks) coming in, then the compaction triggers, and we start 3 txns back? I assume possible, just not likely. Since we're tracking the compaction boundary in CubDB we can check it before writing the incoming transactions and just discard those that are definitely in the past There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, ok. So you're saying transactions cannot span chunks. In which case as we ack after every transaction I think we're safe, right? Or is these some scenario where we could start 3 txns back? More broadly I'm making a few points:
|
||
|> Enum.take(3) | ||
|> case do | ||
[_, _, {key, _}] -> | ||
compact(opts, offset(key)) | ||
|
||
_ -> | ||
# Not enough chunks to warrant compaction | ||
:ok | ||
end | ||
end | ||
|
||
def compact(%FS{} = opts, %LogOffset{} = upper_bound) do | ||
case CubDB.fetch(opts.db, @compaction_info_key) do | ||
{:ok, {_, ^upper_bound}} -> | ||
:ok | ||
|
||
{:ok, {old_log, _}} -> | ||
# compact further | ||
new_log_file_path = | ||
Path.join( | ||
opts.log_dir, | ||
"compact_log_#{DateTime.utc_now() |> DateTime.to_unix(:millisecond)}.electric" | ||
) | ||
|
||
new_log = | ||
CubDB.select(opts.db, | ||
min_key: log_start(), | ||
max_key: log_key(upper_bound), | ||
max_key_inclusive: true | ||
) | ||
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end) | ||
|> FS.LogFile.write_log_file(new_log_file_path <> ".new") | ||
|
||
merged_log = | ||
FS.Compaction.merge_and_compact( | ||
old_log, | ||
new_log, | ||
new_log_file_path, | ||
opts.chunk_bytes_threshold | ||
) | ||
|
||
CubDB.put(opts.db, @compaction_info_key, {merged_log, upper_bound}) | ||
delete_compacted_keys(opts, upper_bound) | ||
FS.Compaction.rm_log(new_log) | ||
FS.Compaction.rm_log(old_log) | ||
:ok | ||
|
||
:error -> | ||
log_file_path = Path.join(opts.log_dir, "compact_log.electric") | ||
|
||
log = | ||
CubDB.select(opts.db, | ||
min_key: log_start(), | ||
max_key: log_key(upper_bound), | ||
max_key_inclusive: true | ||
) | ||
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end) | ||
|> FS.LogFile.write_log_file(log_file_path) | ||
|> FS.Compaction.compact_in_place(opts.chunk_bytes_threshold) | ||
|
||
CubDB.put(opts.db, @compaction_info_key, {log, upper_bound}) | ||
delete_compacted_keys(opts, upper_bound) | ||
:ok | ||
end | ||
end | ||
|
||
defp delete_compacted_keys(%FS{} = opts, upper_bound) do | ||
compacted_chunks = | ||
CubDB.select(opts.db, | ||
min_key: chunk_checkpoint_start(), | ||
max_key: chunk_checkpoint_key(upper_bound), | ||
max_key_inclusive: true | ||
) | ||
|> Enum.map(fn {key, _} -> key end) | ||
|
||
compacted_logs = | ||
CubDB.select(opts.db, | ||
min_key: log_start(), | ||
max_key: log_key(upper_bound) | ||
) | ||
|> Enum.map(fn {key, _} -> key end) | ||
|
||
CubDB.delete_multi(opts.db, compacted_chunks ++ compacted_logs) | ||
end | ||
|
||
# This function raises if the chunk file doesn't exist. | ||
defp stream_snapshot_chunk!(%FS{} = opts, chunk_number) do | ||
Stream.resource( | ||
|
@@ -427,13 +531,20 @@ defmodule Electric.ShapeCache.FileStorage do | |
end | ||
|
||
defp stream_log_chunk(%LogOffset{} = offset, max_offset, %FS{} = opts) do | ||
opts.db | ||
|> CubDB.select( | ||
min_key: log_key(offset), | ||
max_key: log_key(max_offset), | ||
min_key_inclusive: false | ||
) | ||
|> Stream.map(fn {_, item} -> item end) | ||
case CubDB.fetch(opts.db, @compaction_info_key) do | ||
{:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) -> | ||
FS.ChunkIndex.fetch_chunk(elem(log, 1), offset) | ||
FS.LogFile.read_chunk(log, offset) | ||
|
||
_ -> | ||
opts.db | ||
|> CubDB.select( | ||
min_key: log_key(offset), | ||
max_key: log_key(max_offset), | ||
min_key_inclusive: false | ||
) | ||
|> Stream.map(fn {_, {_, _, json_log_item}} -> json_log_item end) | ||
end | ||
end | ||
|
||
defp wait_for_chunk_file_or_snapshot_end( | ||
|
@@ -495,14 +606,21 @@ defmodule Electric.ShapeCache.FileStorage do | |
def get_chunk_end_log_offset(offset, %FS{} = opts), do: get_chunk_end_for_log(offset, opts) | ||
|
||
defp get_chunk_end_for_log(offset, %FS{} = opts) do | ||
CubDB.select(opts.db, | ||
min_key: chunk_checkpoint_key(offset), | ||
max_key: chunk_checkpoint_end(), | ||
min_key_inclusive: false | ||
) | ||
|> Stream.map(fn {key, _} -> offset(key) end) | ||
|> Enum.take(1) | ||
|> Enum.at(0) | ||
case CubDB.fetch(opts.db, @compaction_info_key) do | ||
{:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) -> | ||
{:ok, max_offset, _} = FS.ChunkIndex.fetch_chunk(elem(log, 1), offset) | ||
max_offset | ||
|
||
:error -> | ||
CubDB.select(opts.db, | ||
min_key: chunk_checkpoint_key(offset), | ||
max_key: chunk_checkpoint_end(), | ||
min_key_inclusive: false | ||
) | ||
|> Stream.map(fn {key, _} -> offset(key) end) | ||
|> Enum.take(1) | ||
|> Enum.at(0) | ||
end | ||
end | ||
|
||
defp get_last_snapshot_offset(%FS{} = opts) do | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
defmodule Electric.ShapeCache.FileStorage.ActionFile do | ||
@moduledoc false | ||
alias Electric.Utils | ||
alias Electric.ShapeCache.FileStorage.LogFile | ||
alias Electric.ShapeCache.FileStorage.KeyIndex | ||
import KeyIndex, only: :macros | ||
|
||
@doc """ | ||
Convert a sorted key index to a sorted action file. | ||
|
||
Action file is line-for-line mapping of log file offsets to actions of "keep", "skip" or "compact". | ||
It's ordering should be the same as the log file to allow for sequential reads of both. | ||
|
||
For "keep" lines, we keep the original, for "skip" lines, we skip the original, and for "compact" lines, | ||
we read all specified JSONs from the log file and merge them into one. Multiple updates to the the same | ||
key are mapped to be "skipped" for all but the last one, which is then mapped to "compact" | ||
|
||
Action file format is, in elixir binary: | ||
|
||
<<operation_offset::128, operation_type::binary>> | ||
|
||
Where `operation_type` is one of: | ||
|
||
<<?k::8>> #- Keep | ||
<<?s::8>> #- Skip | ||
<<?c::8, json_offsets_count::16, json_offsets::binary>> #- Compact | ||
|
||
And `json_offsets` is `json_offsets_count` of `<<json_start_position::64, json_size::64>>` | ||
""" | ||
def create_from_key_index(key_index_path, action_file_path) do | ||
KeyIndex.stream(key_index_path) | ||
|> Stream.chunk_by(&key_index_item(&1, :key)) | ||
|> Stream.flat_map(fn chunk -> | ||
# Chunk contains all operations for a given key in order | ||
|
||
chunk | ||
|> Enum.chunk_by(&key_index_item(&1, :op_type)) | ||
|> Enum.flat_map(fn | ||
# Keep any single operation, since inserts/deletes won't be duplicated, and one update can't be compacted | ||
[key_index_item(offset: offset)] -> [<<LogFile.offset(offset)::binary, ?k::8>>] | ||
# If more than one, then it's definitely an update | ||
updates -> updates_to_actions(updates) | ||
end) | ||
end) | ||
|> Stream.into(File.stream!(action_file_path)) | ||
|> Stream.run() | ||
|
||
Utils.external_merge_sort(action_file_path, &stream_for_sorting/1) | ||
end | ||
|
||
@doc """ | ||
Read the action file and return a stream of tuples `{offset, action}`. | ||
""" | ||
@spec stream(path :: String.t()) :: | ||
Enumerable.t( | ||
{LogFile.offset(), | ||
:keep | :skip | {:compact, [{non_neg_integer(), non_neg_integer()}, ...]}} | ||
) | ||
def stream(action_file_path) do | ||
Stream.resource( | ||
fn -> File.open!(action_file_path, [:read, :raw, :read_ahead]) end, | ||
fn file -> | ||
case IO.binread(file, 17) do | ||
:eof -> | ||
{:halt, file} | ||
|
||
<<tx_offset::64, op_offset::64, ?c::8>> -> | ||
<<count::16>> = IO.binread(file, 2) | ||
offsets = for <<pos::64, size::64 <- IO.binread(file, 16 * count)>>, do: {pos, size} | ||
{[{{tx_offset, op_offset}, {:compact, offsets}}], file} | ||
|
||
<<tx_offset::64, op_offset::64, ?k::8>> -> | ||
{[{{tx_offset, op_offset}, :keep}], file} | ||
|
||
<<tx_offset::64, op_offset::64, ?s::8>> -> | ||
{[{{tx_offset, op_offset}, :skip}], file} | ||
end | ||
end, | ||
&File.close/1 | ||
) | ||
end | ||
|
||
# acc format: {positions_len, positions, actions} | ||
defp updates_to_actions(updates, acc \\ {0, [], []}) | ||
# We don't care about order being reversed because it's going to be sorted. | ||
defp updates_to_actions([], {_, _, acc}), do: acc | ||
|
||
# The compaction target is either last one, or after we hit 65535 updates. Technically makes it suboptimal, | ||
# but saves us a lot of memory because the position list will take up at most 65535 * 16 = 1048560 bytes ~ 1MB of memory, | ||
# as opposed to 65536MB if we allow int32 positions. | ||
defp updates_to_actions( | ||
[key_index_item(offset: offset, json: last) | rest], | ||
{total_positions, positions, actions} | ||
) | ||
when rest == [] | ||
when total_positions > 65534 do | ||
actions = | ||
[ | ||
[ | ||
<<LogFile.offset(offset)::binary, ?c::8, length(positions) + 1::16>>, | ||
Utils.list_reverse_map([last | positions], fn {pos, size} -> <<pos::64, size::64>> end) | ||
] | ||
| actions | ||
] | ||
|
||
updates_to_actions(rest, {0, [], actions}) | ||
end | ||
|
||
defp updates_to_actions( | ||
[key_index_item(offset: offset, json: position) | rest], | ||
{total_positions, all_positions, actions} | ||
) do | ||
updates_to_actions( | ||
rest, | ||
{total_positions + 1, [position | all_positions], | ||
[[<<LogFile.offset(offset)::binary, ?s::8>>] | actions]} | ||
) | ||
end | ||
|
||
@spec stream_for_sorting(String.t()) :: | ||
Enumerable.t(Utils.sortable_binary({non_neg_integer(), non_neg_integer()})) | ||
defp stream_for_sorting(action_file_path) do | ||
Stream.resource( | ||
fn -> File.open!(action_file_path, [:read, :raw, :read_ahead]) end, | ||
fn file -> | ||
case IO.binread(file, 17) do | ||
:eof -> | ||
{:halt, file} | ||
|
||
<<tx_offset::64, op_offset::64, ?c::8>> = line -> | ||
<<count::16>> = IO.binread(file, 2) | ||
|
||
{[{{tx_offset, op_offset}, line <> <<count::16>> <> IO.binread(file, count * 16)}], | ||
file} | ||
|
||
<<tx_offset::64, op_offset::64, _::8>> = line -> | ||
{[{{tx_offset, op_offset}, line}], file} | ||
end | ||
end, | ||
&File.close/1 | ||
) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type
isoperation
, right? Please could it be renamed as suchThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the various other places in lib and test that
type
is usedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also accept
op_type
as you've used elsewhere