-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: adds compaction #2231
base: main
Are you sure you want to change the base?
feat!: adds compaction #2231
Conversation
Exciting! Let's have two full reviews on this one. |
aa8e1a7
to
497648c
Compare
Very nice! I'll let others dig into the exact details of the implementation/algorithm but a couple high-level things I don't see here:
|
It seems to be a periodic thing per shape log. We really haven't discussed strategies for running compaction. +1 for creating benchmarks to evaluate compaction in its many dimensions. Both things above are quite large scope, so shall shall we make the work to merge this PR with compaction turned off by default and create follow up issues to investigate performance and scheduling strategies? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Very cool. The comments are very helpful too. I've left a few comments, mostly nit-picks.
@@ -324,7 +326,7 @@ defmodule Electric.ShapeCache.FileStorage do | |||
log_items | |||
|> Enum.map(fn | |||
{:chunk_boundary, offset} -> {chunk_checkpoint_key(offset), nil} | |||
{offset, json_log_item} -> {log_key(offset), json_log_item} | |||
{offset, key, type, json_log_item} -> {log_key(offset), {key, type, json_log_item}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type
is operation
, right? Please could it be renamed as such
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the various other places in lib and test that type
is used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also accept op_type
as you've used elsewhere
@@ -0,0 +1,52 @@ | |||
defmodule Electric.ShapeCache.StorageCleaner do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageCompactor
or just Compactor
would be clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is you follow my other advice of refactoring parts of OnFile
into Compaction
then this could be called CompactionRunner
max_key: chunk_checkpoint_end(), | ||
reverse: true | ||
) | ||
# Keep the last 2 chunks as-is just in case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of what?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BC we don't want to lose idempotency of writes from postgres to the same txn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So why 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vibes, mostly. I'm assuming that over 20MB of data we're far enough from the tip where we don't care about this property (because it only happens when PG connection drops at an unfortunate moment where we've written the log but didn't ack. So only a few transactions at the tip can be affected. 20MB seems like a reasonable buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So currently we ack after every transaction, right? You could have a transaction that was over 20MB. Seems a likely time electric might crash too (but then succeed on restart). Could this result in duplicate entries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so that's 3 transactions over 10 MB (to make separate chunks) coming in, then the compaction triggers, and we start 3 txns back? I assume possible, just not likely. Since we're tracking the compaction boundary in CubDB we can check it before writing the incoming transactions and just discard those that are definitely in the past
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok. So you're saying transactions cannot span chunks. In which case as we ack after every transaction I think we're safe, right? Or is these some scenario where we could start 3 txns back?
More broadly I'm making a few points:
- this explanation for why we keep the last two chunks (much of the detail of this thread) should be in the comment
- how we ack may change in the future making this more of an issue, so it would be good to guard against it
- I don't like dismissing issues as unlikely. I've seen plenty of "unlikely" bugs occur :) We should put a failsafe in even that failsafe is log and crash horribly (your "discard those that are definitely in the past" idea seems a much nicer failsafe!)
File.rename!(log_file_path <> ".compacted" <> ".chunk_index", log_file_path <> ".chunk_index") | ||
File.rename!(log_file_path <> ".compacted", log_file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment describing how race-conditions are avoided when compaction is concurrent with reading would be handy.
@@ -0,0 +1,535 @@ | |||
defmodule Electric.ShapeCache.FileStorage.OnDisk do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot going on in this file. I can't help but feel by splitting it up, giving it some abstractions, would help comprehending it.
Perhaps it could be split into:
- FileStorage.LogFile
- FileStorage.Compaction
- FileStorage.Compaction.ActionFile
- FileStorage.Compaction.KeyIndex
- FileStorage.Compaction.ChunkIndex
In general, the complexity of the compaction is O(n log(n)), but the multiplier is not trivial because of random disk access. In practice:
I don't see how we can improve last part where we need to read all the updates to merge them without loading more stuff into memory, but we can e.g. use a rust/zig implementation of an external merge sort quite easily to lower overheads there. |
What I've been imagining is we'd also want to measure the amount of "entropy" in the log — we don't want to do compaction on large logs every 10 minutes unless there's been a ton of new updates.
Cool — benchmarks + server metrics on live servers will tell us if this is fine or not soon enough |
We need to put this feature behind a feature flag given that at this we don't have enough knowledge about how it will impact the system at this point. We will do follow up work to learn about compaction scheduling and performance impact. One idea for use the custom params when creating a shape for enabling compaction for that shape. Custom params could be labeled as experimental, so we can remove it later. |
I think a potentially simple way to measure, and trigger, this is "log churn":
We are only compacting updates (do we remove all updates if there is a delete? assuming we do) and so churn = 2 indicates that each row has at least an insert + update/delete on average. Therefore, we want to run compaction when churn is some level higher than 2. Maybe 2.5 is a good starting point for experiments. There's more sophisticated ways to measure this that would take into account "hot records", but this seems nice and simple. |
Kafka's algorithm btw for triggering compaction is they track a "dirty ratio" i.e. which for us what % of messages are duplicate updates. https://developer.confluent.io/courses/architecture/compaction/#when-compaction-is-triggered We could track something similar in the shape log metadata so whenever we hit say ~30% dirty updates, then we run compaction. |
✅ Deploy Preview for electric-next ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
This PR introduces compaction for the FileStorage. Compaction follows some rules:
Compaction doesn't currently affect "live tail" storage - we're still using CubDB for that, but compacted data is moved out of CubDB.
On-disk format for the log is
With a supporing chunk index
that allows aligning reads for all clients and acts as a sparse index at the same time - the client comes with the offset, we find the chunk to serve them, and then serve only part of that chunk they've not seen, same as we're doing right now