-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: replace default engine JSON reader's FileStream
with concurrent futures
#711
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #711 +/- ##
==========================================
- Coverage 84.02% 83.99% -0.04%
==========================================
Files 77 77
Lines 18063 18349 +286
Branches 18063 18349 +286
==========================================
+ Hits 15178 15412 +234
- Misses 2167 2217 +50
- Partials 718 720 +2 ☔ View full report in Codecov by Sentry. |
yea i do too but mostly just looks like a copypasta of the arrow_json way of async stream decoding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, somehow forgot to post this yesterday :(
kernel/src/engine/default/json.rs
Outdated
readahead: 10, | ||
batch_size: 1024, | ||
readahead: 1000, | ||
batch_size: 1024 * 128, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this batch size influence behavior?
AFAIK, the vast majority of Delta commits are tiny -- just a few file actions -- and so a large batch size may not be especially helpful in the common case. It may also not hurt, depending on how it's used, hence the question.
Note that we do expect to see absurdly massive Delta commits on occasion -- tens of GB or more -- if e.g. a big CREATE [OR REPLACE] TABLE AS operation commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a bit of digging, this is the number of rows that we decode at once (keep resident in memory) until we yield a batch. I've updated doc comments throughout but TLDR it's just a limit on the number of rows in each output batch
kernel/src/engine/default/json.rs
Outdated
// check err? | ||
let _ = tx.send(item); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, we're just sending all results to the receiver -- errors and all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've updated this since your review - yea we send errors over channel, this just now warn!
s if send returns an error (which would mean no one is listening on the other end)
|
||
let mut stream = stream::iter(file_futures) | ||
.buffered(readahead) | ||
.try_flatten() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does try_flatten
do? And where is it defined/documented?
(my google-fu is apparently weak today)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, found it -- TryStreamExt::try_flatten (not to be confused with TryFutureExt::try_flatten).
So open
returns a future (whose Ok result is a stream) and try_flatten
effectively concatenates all those streams into a single stream, but preserving any Err results?
And this is the key to preserving order, because each stream is ordered within its file, and the flattened stream guarantees that
each individual stream will get exhausted before moving on to the next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes exactly! (and I'll document this more in line)
the key to ordering is both buffered
and the try_flatten
both combinators on the stream which each retain ordering
GetResultPayload::File(file, _) => { | ||
let reader = ReaderBuilder::new(schema) | ||
.with_batch_size(batch_size) | ||
.build(BufReader::new(file))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kernel/src/engine/default/json.rs
Outdated
}) | ||
.collect(); | ||
|
||
let _ = future::join_all(handles).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need this call if we use mpsc::IntoIter?
This iterator will block whenever next is called, waiting for a new message, and None will be returned if the corresponding channel has hung up.
(see above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm I ran into some odd behavior without it - it looks like we just don't wait on any of the spawned tasks to finish and then we 'finish' the test without actually doing anything. can look into this more deeply later :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we probably need to do what the actual json read code is doing, and produce a flattened stream of futures.
} | ||
} | ||
|
||
/// Set the maximum number of batches to read ahead during [Self::read_json_files()]. | ||
/// Deprecated: use [Self::with_buffer_size()]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to avoid a breaking change or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep exactly - I may just collect some of these "we need breaking change sometime" into an issue and then whenever we decide to pursue 0.8 (and have actual breaking changes need) then we can remove some of these deprecated functions
kernel/src/engine/default/json.rs
Outdated
// note: join_all is ordered | ||
let files = future::join_all(file_futures).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may be ordered, but it also materializes the entire list up front (and could cause silent data loss if the mpsc overflows).
Is there not a way to try-flatten the streams into a single stream that we then convert to a blocking iterator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said -- I don't think this test actually adds any value over the new test that leverages the ordered object store. Two items is too few to reliably catch races, and if there were a race, we don't want a test that only notices some of the time.
I think as long as we have tested that our stream machinery preserves order, and verified that the json reads return correct data at all, probably don't need much or any testing for the combination of the two?
Put another way -- what code path(s) does this test exercise, that other tests did not cover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also -- what does it mean for join_all
to be "ordered" in the first place? I thought spawn
kicked off the tasks independently, and so they could complete in any order even if nobody ever joins on them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(working on making this test a better one than just the two items it had before)
for context on both tests:
test_ordered_get_store
is just a test to validate that our specialOrderedGetStore
does the right thingtest_read_json_files_ordering
is actually using theOrderedGetStore
to set up a specific out-of-order test so that we ensureread_json_files
hands things back in the correct order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also -- what does it mean for join_all to be "ordered" in the first place? I thought spawn kicked off the tasks independently, and so they could complete in any order even if nobody ever joins on them?
regardless of using spawn or not, it means that the list of futures (JoinHandles
if spawn
or some other futures if not) are resolved in order - the returned files
is in the original order of the list of file_futures
, NOT in the order that they are resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand why the unit test behaves the way it does, but the logic in the actual json reader looks correct.
kernel/src/engine/default/json.rs
Outdated
}) | ||
.collect(); | ||
|
||
let _ = future::join_all(handles).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we probably need to do what the actual json read code is doing, and produce a flattened stream of futures.
@@ -2,19 +2,22 @@ | |||
|
|||
use std::io::BufReader; | |||
use std::ops::Range; | |||
use std::sync::Arc; | |||
use std::task::{ready, Poll}; | |||
use std::sync::{mpsc, Arc}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider tokio::sync::mpsc
instead? much faster, designed to be used in async context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't depend on tokio in the default except for implementing executors in terms of it. it might be fine, but for now we can stay stdlib
let result = self.inner.get(location).await; | ||
|
||
// we implement a future which only resolves once the requested path is next in order | ||
future::poll_fn(move |cx| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when one slow task is at the front of the line? Everything just waits for that right? I think in an ideal network situation this works fine, but if one slow future is at the front it seems like this just log jams the entire process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but what we're simulating here is a specific ordering of data returned. We're not trying to check if things are performant or anything. So if there's a "slow" request in this case, it implies that all the other requests must be slower, since we've specified the order they should return up front.
Really this is a test for "can kernel handle it when async stuff returns out of order".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In particular: kernel's log replay requires that results come back in the order they were requested in, not the order they completed in. That's a correctness constraint. And yes, if there's a straggler at the head of the queue (in real life) that does mean everybody else is waiting. I would hope the async machinery still allows the tasks deeper in the queue to make progress meanwhile.
This test is forcing out of order completion to ensure the results are still returned in order.
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 3)] | ||
async fn test_read_json_files_ordering() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be helpful to have a test that exceeds the buffering limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple of small things but basically lgtm
kernel/Cargo.toml
Outdated
@@ -159,3 +159,4 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ | |||
"env-filter", | |||
"fmt", | |||
] } | |||
async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: keep alphabetical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved up, to after our path-based deps but on top of others, though they aren't in order it doesn't look like..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hah right, we should actually alphabatize those at some point :)
@@ -2,19 +2,22 @@ | |||
|
|||
use std::io::BufReader; | |||
use std::ops::Range; | |||
use std::sync::Arc; | |||
use std::task::{ready, Poll}; | |||
use std::sync::{mpsc, Arc}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't depend on tokio in the default except for implementing executors in terms of it. it might be fine, but for now we can stay stdlib
kernel/src/engine/default/json.rs
Outdated
state.ordered_keys.pop_front().unwrap(); | ||
|
||
// there are three possible cases, either: | ||
// 1. the next key has a waker already registered, in which case we wake it up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe note that this is the case where something has already requested the next key in line, so that's why there is a waker waiting, and we need to wake it up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added more!
let result = self.inner.get(location).await; | ||
|
||
// we implement a future which only resolves once the requested path is next in order | ||
future::poll_fn(move |cx| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but what we're simulating here is a specific ordering of data returned. We're not trying to check if things are performant or anything. So if there's a "slow" request in this case, it implies that all the other requests must be slower, since we've specified the order they should return up front.
Really this is a test for "can kernel handle it when async stuff returns out of order".
let result = self.inner.get(location).await; | ||
|
||
// we implement a future which only resolves once the requested path is next in order | ||
future::poll_fn(move |cx| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just have this return Poll::Ready(result)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
poll_fn
takes an FnMut
so it would require us being able to either (1) capture the result into the closure multiple times (impossible - would move multiple times) or (2) we would have to just directly do the self.inner.get
inside the poll_fn
which I think is also difficult since poll_fn
is synchronous and we want to be able to .await
.
let me know if I'm missing something but i played with it for a second and came up with those items!
kernel/src/engine/default/json.rs
Outdated
fn new(inner: T, ordered_keys: impl Into<VecDeque<Path>>) -> Self { | ||
let ordered_keys = ordered_keys.into(); | ||
fn new(inner: T, ordered_keys: &[Path]) -> Self { | ||
let ordered_keys: Vec<Path> = ordered_keys.to_vec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably don't need the type annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yep
What changes are proposed in this pull request?
The original
FileStream
API, though intended to concurrently make GET requests to the object store, actually made serial requests and relied on a hand-written poll function in order to implementStream
. This PR aims to make a minimal change in order to (1) increase performance for the JSON reader by issuing concurrent GET requests and (2) simplify the code by removing the need for a customStream
and instead leverage existing functions/adapters to convert the files to read into aStream
and issue concurrent requests through thefutures::stream::buffered
adapter.This is effectively a similar improvement as in #595 but for the JSON reader.
Specifically, the changes are:
FileStream::new_async_read_iterator()
call (the manually-implementedStream
) with an inline implementation of converting the files slice into a Stream (viastream::iter
) and use thefutures::stream::buffered
adapter to concurrently execute file opening futures. It then sends results across anmpsc
channel to bridge the async/sync gap.FileOpener
(which requires a synchronousfn open()
and instead directly exposes anasync fn open()
for easier/simpler use above. This removes all reliance onFileStream
/FileOpener
in the JSON reader.ObjectStore
implementation:OrderedGetStore
to deterministically control the ordering in which GET request futures are resolvedHow was this change tested?
added test with a new
OrderedGetStore
which will resolve the GET requests in a jumbled order but we expect the test to return the natural order of requests. in a additionally, manually validated that we went from serial JSON file reads to concurrent reads