Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: replace default engine JSON reader's FileStream with concurrent futures #711

Open
wants to merge 36 commits into
base: main
Choose a base branch
from

Conversation

zachschuermann
Copy link
Collaborator

@zachschuermann zachschuermann commented Feb 21, 2025

What changes are proposed in this pull request?

The original FileStream API, though intended to concurrently make GET requests to the object store, actually made serial requests and relied on a hand-written poll function in order to implement Stream. This PR aims to make a minimal change in order to (1) increase performance for the JSON reader by issuing concurrent GET requests and (2) simplify the code by removing the need for a custom Stream and instead leverage existing functions/adapters to convert the files to read into a Stream and issue concurrent requests through the futures::stream::buffered adapter.

This is effectively a similar improvement as in #595 but for the JSON reader.

Specifically, the changes are:

  1. replace the FileStream::new_async_read_iterator() call (the manually-implemented Stream) with an inline implementation of converting the files slice into a Stream (via stream::iter) and use the futures::stream::buffered adapter to concurrently execute file opening futures. It then sends results across an mpsc channel to bridge the async/sync gap.
  2. JsonOpener no longer implements FileOpener (which requires a synchronous fn open() and instead directly exposes an async fn open() for easier/simpler use above. This removes all reliance on FileStream/FileOpener in the JSON reader.
  3. adds a custom ObjectStore implementation: OrderedGetStore to deterministically control the ordering in which GET request futures are resolved

How was this change tested?

added test with a new OrderedGetStore which will resolve the GET requests in a jumbled order but we expect the test to return the natural order of requests. in a additionally, manually validated that we went from serial JSON file reads to concurrent reads

Copy link

codecov bot commented Feb 21, 2025

Codecov Report

Attention: Patch coverage is 79.94186% with 69 lines in your changes missing coverage. Please review.

Project coverage is 83.99%. Comparing base (4c00de4) to head (1e6f746).

Files with missing lines Patch % Lines
kernel/src/engine/default/json.rs 79.94% 62 Missing and 7 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #711      +/-   ##
==========================================
- Coverage   84.02%   83.99%   -0.04%     
==========================================
  Files          77       77              
  Lines       18063    18349     +286     
  Branches    18063    18349     +286     
==========================================
+ Hits        15178    15412     +234     
- Misses       2167     2217      +50     
- Partials      718      720       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@github-actions github-actions bot added the breaking-change Change that will require a version bump label Feb 21, 2025
@zachschuermann zachschuermann removed the breaking-change Change that will require a version bump label Feb 24, 2025
@github-actions github-actions bot added the breaking-change Change that will require a version bump label Feb 24, 2025
@zachschuermann
Copy link
Collaborator Author

wish the open function could still be a bit simpler, but let's not spin our wheels on that too much. This mostly looks good, just a couple of small things.

yea i do too but mostly just looks like a copypasta of the arrow_json way of async stream decoding

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, somehow forgot to post this yesterday :(

readahead: 10,
batch_size: 1024,
readahead: 1000,
batch_size: 1024 * 128,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this batch size influence behavior?

AFAIK, the vast majority of Delta commits are tiny -- just a few file actions -- and so a large batch size may not be especially helpful in the common case. It may also not hurt, depending on how it's used, hence the question.

Note that we do expect to see absurdly massive Delta commits on occasion -- tens of GB or more -- if e.g. a big CREATE [OR REPLACE] TABLE AS operation commits.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a bit of digging, this is the number of rows that we decode at once (keep resident in memory) until we yield a batch. I've updated doc comments throughout but TLDR it's just a limit on the number of rows in each output batch

Comment on lines 104 to 105
// check err?
let _ = tx.send(item);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, we're just sending all results to the receiver -- errors and all?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've updated this since your review - yea we send errors over channel, this just now warn!s if send returns an error (which would mean no one is listening on the other end)


let mut stream = stream::iter(file_futures)
.buffered(readahead)
.try_flatten()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does try_flatten do? And where is it defined/documented?
(my google-fu is apparently weak today)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, found it -- TryStreamExt::try_flatten (not to be confused with TryFutureExt::try_flatten).

So open returns a future (whose Ok result is a stream) and try_flatten effectively concatenates all those streams into a single stream, but preserving any Err results?

And this is the key to preserving order, because each stream is ordered within its file, and the flattened stream guarantees that

each individual stream will get exhausted before moving on to the next

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly! (and I'll document this more in line)

the key to ordering is both buffered and the try_flatten both combinators on the stream which each retain ordering

GetResultPayload::File(file, _) => {
let reader = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(BufReader::new(file))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can see this, consider hiding whitespace:
image

@github-actions github-actions bot added the breaking-change Change that will require a version bump label Feb 27, 2025
})
.collect();

let _ = future::join_all(handles).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this call if we use mpsc::IntoIter?

This iterator will block whenever next is called, waiting for a new message, and None will be returned if the corresponding channel has hung up.

(see above)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm I ran into some odd behavior without it - it looks like we just don't wait on any of the spawned tasks to finish and then we 'finish' the test without actually doing anything. can look into this more deeply later :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we probably need to do what the actual json read code is doing, and produce a flattened stream of futures.

}
}

/// Set the maximum number of batches to read ahead during [Self::read_json_files()].
/// Deprecated: use [Self::with_buffer_size()].
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to avoid a breaking change or something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep exactly - I may just collect some of these "we need breaking change sometime" into an issue and then whenever we decide to pursue 0.8 (and have actual breaking changes need) then we can remove some of these deprecated functions

Comment on lines 626 to 627
// note: join_all is ordered
let files = future::join_all(file_futures).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be ordered, but it also materializes the entire list up front (and could cause silent data loss if the mpsc overflows).

Is there not a way to try-flatten the streams into a single stream that we then convert to a blocking iterator?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said -- I don't think this test actually adds any value over the new test that leverages the ordered object store. Two items is too few to reliably catch races, and if there were a race, we don't want a test that only notices some of the time.

I think as long as we have tested that our stream machinery preserves order, and verified that the json reads return correct data at all, probably don't need much or any testing for the combination of the two?

Put another way -- what code path(s) does this test exercise, that other tests did not cover?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also -- what does it mean for join_all to be "ordered" in the first place? I thought spawn kicked off the tasks independently, and so they could complete in any order even if nobody ever joins on them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(working on making this test a better one than just the two items it had before)

for context on both tests:

  1. test_ordered_get_store is just a test to validate that our special OrderedGetStore does the right thing
  2. test_read_json_files_ordering is actually using the OrderedGetStore to set up a specific out-of-order test so that we ensure read_json_files hands things back in the correct order

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also -- what does it mean for join_all to be "ordered" in the first place? I thought spawn kicked off the tasks independently, and so they could complete in any order even if nobody ever joins on them?

regardless of using spawn or not, it means that the list of futures (JoinHandles if spawn or some other futures if not) are resolved in order - the returned files is in the original order of the list of file_futures, NOT in the order that they are resolved.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why the unit test behaves the way it does, but the logic in the actual json reader looks correct.

})
.collect();

let _ = future::join_all(handles).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we probably need to do what the actual json read code is doing, and produce a flattened stream of futures.

@@ -2,19 +2,22 @@

use std::io::BufReader;
use std::ops::Range;
use std::sync::Arc;
use std::task::{ready, Poll};
use std::sync::{mpsc, Arc};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider tokio::sync::mpsc instead? much faster, designed to be used in async context

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't depend on tokio in the default except for implementing executors in terms of it. it might be fine, but for now we can stay stdlib

let result = self.inner.get(location).await;

// we implement a future which only resolves once the requested path is next in order
future::poll_fn(move |cx| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when one slow task is at the front of the line? Everything just waits for that right? I think in an ideal network situation this works fine, but if one slow future is at the front it seems like this just log jams the entire process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but what we're simulating here is a specific ordering of data returned. We're not trying to check if things are performant or anything. So if there's a "slow" request in this case, it implies that all the other requests must be slower, since we've specified the order they should return up front.

Really this is a test for "can kernel handle it when async stuff returns out of order".

Copy link
Collaborator

@scovich scovich Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular: kernel's log replay requires that results come back in the order they were requested in, not the order they completed in. That's a correctness constraint. And yes, if there's a straggler at the head of the queue (in real life) that does mean everybody else is waiting. I would hope the async machinery still allows the tasks deeper in the queue to make progress meanwhile.

This test is forcing out of order completion to ensure the results are still returned in order.

}

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn test_read_json_files_ordering() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be helpful to have a test that exceeds the buffering limit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added!

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a couple of small things but basically lgtm

@@ -159,3 +159,4 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"fmt",
] }
async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: keep alphabetical.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved up, to after our path-based deps but on top of others, though they aren't in order it doesn't look like..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah right, we should actually alphabatize those at some point :)

@@ -2,19 +2,22 @@

use std::io::BufReader;
use std::ops::Range;
use std::sync::Arc;
use std::task::{ready, Poll};
use std::sync::{mpsc, Arc};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't depend on tokio in the default except for implementing executors in terms of it. it might be fine, but for now we can stay stdlib

state.ordered_keys.pop_front().unwrap();

// there are three possible cases, either:
// 1. the next key has a waker already registered, in which case we wake it up
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe note that this is the case where something has already requested the next key in line, so that's why there is a waker waiting, and we need to wake it up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added more!

let result = self.inner.get(location).await;

// we implement a future which only resolves once the requested path is next in order
future::poll_fn(move |cx| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but what we're simulating here is a specific ordering of data returned. We're not trying to check if things are performant or anything. So if there's a "slow" request in this case, it implies that all the other requests must be slower, since we've specified the order they should return up front.

Really this is a test for "can kernel handle it when async stuff returns out of order".

let result = self.inner.get(location).await;

// we implement a future which only resolves once the requested path is next in order
future::poll_fn(move |cx| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have this return Poll::Ready(result)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll_fn takes an FnMut so it would require us being able to either (1) capture the result into the closure multiple times (impossible - would move multiple times) or (2) we would have to just directly do the self.inner.get inside the poll_fn which I think is also difficult since poll_fn is synchronous and we want to be able to .await.

let me know if I'm missing something but i played with it for a second and came up with those items!

@zachschuermann zachschuermann removed the breaking-change Change that will require a version bump label Feb 28, 2025
@github-actions github-actions bot added the breaking-change Change that will require a version bump label Feb 28, 2025
fn new(inner: T, ordered_keys: impl Into<VecDeque<Path>>) -> Self {
let ordered_keys = ordered_keys.into();
fn new(inner: T, ordered_keys: &[Path]) -> Self {
let ordered_keys: Vec<Path> = ordered_keys.to_vec();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably don't need the type annotation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yep

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants