Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: adds compaction #2231

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

feat!: adds compaction #2231

wants to merge 4 commits into from

Conversation

icehaunter
Copy link
Contributor

This PR introduces compaction for the FileStorage. Compaction follows some rules:

  1. Relative order preservation of insert/update/delete operations over same row: a consumer cannot see an update before the insert
  2. insert/delete operations are never compacted
  3. updates to the same row are compacted into one
  4. clients can continue reading from same offsets as before the compaction and shouldn't see inserts/deletes they've already seen - they may see updates they've already seen as part of another latter update
  5. Live tail is not affected by compaction in order to preserve idempotent inserts of already-seen transactions from Postgres

Compaction doesn't currently affect "live tail" storage - we're still using CubDB for that, but compacted data is moved out of CubDB.

On-disk format for the log is

<<tx_offset::64, op_offset::64, key_size::32, key::binary-size(key_size), op_type::8, json_size::64, json::binary-size(json_size)>>

With a supporing chunk index

<<start_tx_offset::64, start_op_offset::64, start_file_pos::64, end_tx_offset::64, end_op_offset::64, start_file_pos::64>>

that allows aligning reads for all clients and acts as a sparse index at the same time - the client comes with the offset, we find the chunk to serve them, and then serve only part of that chunk they've not seen, same as we're doing right now

@balegas balegas requested review from kevin-dp and robacourt January 16, 2025 09:50
@balegas
Copy link
Contributor

balegas commented Jan 16, 2025

Exciting! Let's have two full reviews on this one.

@icehaunter icehaunter force-pushed the ilia/feat/compaction branch from aa8e1a7 to 497648c Compare January 16, 2025 10:00
@KyleAMathews
Copy link
Contributor

Very nice! I'll let others dig into the exact details of the implementation/algorithm but a couple high-level things I don't see here:

  1. when do we run compaction? I didn't see what triggers it?
  2. we'll want to be cautious about how/when we run it to avoid overwhelming the server. Perhaps we have a global queue so only one compaction can run at a time?
  3. do we have some approach for measuring resource usage during compaction?
  4. it'd be nice to have a benchmark for various compaction scenarios

@balegas
Copy link
Contributor

balegas commented Jan 16, 2025

when do we run compaction? I didn't see what triggers it?

It seems to be a periodic thing per shape log. We really haven't discussed strategies for running compaction.

+1 for creating benchmarks to evaluate compaction in its many dimensions.

Both things above are quite large scope, so shall shall we make the work to merge this PR with compaction turned off by default and create follow up issues to investigate performance and scheduling strategies?

Copy link
Contributor

@robacourt robacourt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Very cool. The comments are very helpful too. I've left a few comments, mostly nit-picks.

@@ -324,7 +326,7 @@ defmodule Electric.ShapeCache.FileStorage do
log_items
|> Enum.map(fn
{:chunk_boundary, offset} -> {chunk_checkpoint_key(offset), nil}
{offset, json_log_item} -> {log_key(offset), json_log_item}
{offset, key, type, json_log_item} -> {log_key(offset), {key, type, json_log_item}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type is operation, right? Please could it be renamed as such

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the various other places in lib and test that type is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also accept op_type as you've used elsewhere

@@ -0,0 +1,52 @@
defmodule Electric.ShapeCache.StorageCleaner do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StorageCompactor or just Compactor would be clearer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is you follow my other advice of refactoring parts of OnFile into Compaction then this could be called CompactionRunner

max_key: chunk_checkpoint_end(),
reverse: true
)
# Keep the last 2 chunks as-is just in case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of what?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BC we don't want to lose idempotency of writes from postgres to the same txn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vibes, mostly. I'm assuming that over 20MB of data we're far enough from the tip where we don't care about this property (because it only happens when PG connection drops at an unfortunate moment where we've written the log but didn't ack. So only a few transactions at the tip can be affected. 20MB seems like a reasonable buffer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently we ack after every transaction, right? You could have a transaction that was over 20MB. Seems a likely time electric might crash too (but then succeed on restart). Could this result in duplicate entries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so that's 3 transactions over 10 MB (to make separate chunks) coming in, then the compaction triggers, and we start 3 txns back? I assume possible, just not likely. Since we're tracking the compaction boundary in CubDB we can check it before writing the incoming transactions and just discard those that are definitely in the past

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. So you're saying transactions cannot span chunks. In which case as we ack after every transaction I think we're safe, right? Or is these some scenario where we could start 3 txns back?

More broadly I'm making a few points:

  • this explanation for why we keep the last two chunks (much of the detail of this thread) should be in the comment
  • how we ack may change in the future making this more of an issue, so it would be good to guard against it
  • I don't like dismissing issues as unlikely. I've seen plenty of "unlikely" bugs occur :) We should put a failsafe in even that failsafe is log and crash horribly (your "discard those that are definitely in the past" idea seems a much nicer failsafe!)

Comment on lines 176 to 177
File.rename!(log_file_path <> ".compacted" <> ".chunk_index", log_file_path <> ".chunk_index")
File.rename!(log_file_path <> ".compacted", log_file_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment describing how race-conditions are avoided when compaction is concurrent with reading would be handy.

@@ -0,0 +1,535 @@
defmodule Electric.ShapeCache.FileStorage.OnDisk do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot going on in this file. I can't help but feel by splitting it up, giving it some abstractions, would help comprehending it.

Perhaps it could be split into:

  • FileStorage.LogFile
  • FileStorage.Compaction
  • FileStorage.Compaction.ActionFile
  • FileStorage.Compaction.KeyIndex
  • FileStorage.Compaction.ChunkIndex

@icehaunter
Copy link
Contributor Author

@KyleAMathews

  1. Periodic with staggered periods. It's every 10 minutes per shape with a 5 minute jitter. I think we can increase that, but I thought we'd start with a somewhat reasonable default and continue from there. A compaction is triggered only if there are (by default) >30MB of data in the live log, so re-triggering every 10 minutes seems OK given that there's nothing to compact.
  2. It's not easy to "get server load" at a glance, or plan for when there's going to be downtime on a hot instance. We have Elixir on our side in that the scheduler will slow down everything evenly, so as long we're limiting concurrent compaction we're fine. I decided to not go with a global coordinator for now, instead opting for time-staggering for now
  3. We don't, but I can add something. It's not easy to "prove" resource usage on BEAM in general and in tests in particular, but I was very cautious to read as little as possible from disk without acting and "releasing" that data. In practice memory overhead of compaction process is currently ~50MB - that's because we need to sort large files and I've chose 50MB buffer for external merge sort.
  4. Yeah, I'll add that next

In general, the complexity of the compaction is O(n log(n)), but the multiplier is not trivial because of random disk access.

In practice:

  1. O(N) sequential access through the log to build a key index (in future possible to write along with the operations if it's on-disk and not in CubDB.
  2. O(N) sequential read of 50MB blocks of key index, in-memory sort O(n log (n)), sequential write, O(N) read of all blocks and sequential write of a merged sorted file
  3. O(N) sequential read of sorted key index with grouping of updates. Grouping is done in-memory, but we're operating on integer file positions, so it's 16 bytes per update in the group, into a sequential write of "action file"
  4. 50-MB block external merge sort (O(n log (n) for in-memory sort) of the action file by offset
  5. O(N) sequential read of both log and action file with random reads in the log file whenever we encounter update grouping which results in a sequential write of the compacted log. Random reads crank up practical price, but we load updates pairwise into memory for a merge, so memory overhead is small.

I don't see how we can improve last part where we need to read all the updates to merge them without loading more stuff into memory, but we can e.g. use a rust/zig implementation of an external merge sort quite easily to lower overheads there.

@KyleAMathews
Copy link
Contributor

A compaction is triggered only if there are (by default) >30MB of data in the live log, so re-triggering every 10 minutes seems OK given that there's nothing to compact.

What I've been imagining is we'd also want to measure the amount of "entropy" in the log — we don't want to do compaction on large logs every 10 minutes unless there's been a ton of new updates.

I decided to not go with a global coordinator for now, instead opting for time-staggering for now

Cool — benchmarks + server metrics on live servers will tell us if this is fine or not soon enough

@balegas
Copy link
Contributor

balegas commented Jan 19, 2025

We need to put this feature behind a feature flag given that at this we don't have enough knowledge about how it will impact the system at this point. We will do follow up work to learn about compaction scheduling and performance impact.

One idea for use the custom params when creating a shape for enabling compaction for that shape. Custom params could be labeled as experimental, so we can remove it later.

@samwillis
Copy link
Contributor

I think a potentially simple way to measure, and trigger, this is "log churn":

churn = (updates + deletes) / inserts

We are only compacting updates (do we remove all updates if there is a delete? assuming we do) and so churn = 2 indicates that each row has at least an insert + update/delete on average. Therefore, we want to run compaction when churn is some level higher than 2. Maybe 2.5 is a good starting point for experiments.

There's more sophisticated ways to measure this that would take into account "hot records", but this seems nice and simple.

@KyleAMathews
Copy link
Contributor

Kafka's algorithm btw for triggering compaction is they track a "dirty ratio" i.e. which for us what % of messages are duplicate updates. https://developer.confluent.io/courses/architecture/compaction/#when-compaction-is-triggered

We could track something similar in the shape log metadata so whenever we hit say ~30% dirty updates, then we run compaction.

Copy link

netlify bot commented Jan 20, 2025

Deploy Preview for electric-next ready!

Name Link
🔨 Latest commit 673ae82
🔍 Latest deploy log https://app.netlify.com/sites/electric-next/deploys/678eaf333aaae1000880e74c
😎 Deploy Preview https://deploy-preview-2231--electric-next.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants