Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: replace default engine JSON reader's FileStream with concurrent futures #711

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ea417b5
let's see
nicklan Feb 21, 2025
504b20c
support --all-features again
nicklan Feb 21, 2025
c217abc
revert
nicklan Feb 21, 2025
8926729
workflows use --all-features again
nicklan Feb 21, 2025
04cdd68
Merge branch 'main' into fix-semvar-check
nicklan Feb 21, 2025
4aa86aa
wip: simple buffered streams
zachschuermann Feb 21, 2025
b0869fc
into_iter
zachschuermann Feb 21, 2025
ffed827
Merge remote-tracking branch 'upstream/main' into concurrent-json
zachschuermann Feb 21, 2025
0cd0cf3
cleaner selection + readme
nicklan Feb 21, 2025
85ebcb8
also for parquet.rs
nicklan Feb 21, 2025
ae8c559
add a `need_arrow` flag
nicklan Feb 22, 2025
22394c8
Merge branch 'main' into fix-semvar-check
nicklan Feb 22, 2025
ce2667f
Merge remote-tracking branch 'nick/fix-semvar-check' into concurrent-…
zachschuermann Feb 24, 2025
1f2f79c
Merge branch 'main' into concurrent-json
zachschuermann Feb 24, 2025
9862a3a
Merge remote-tracking branch 'refs/remotes/origin/concurrent-json' in…
zachschuermann Feb 24, 2025
bc16927
Merge remote-tracking branch 'upstream/main' into concurrent-json
zachschuermann Feb 25, 2025
971ed43
make Json opener async fn and add test
zachschuermann Feb 25, 2025
1a14f90
fmt
zachschuermann Feb 25, 2025
811cc2e
fix comments
zachschuermann Feb 25, 2025
494a470
cleanup
zachschuermann Feb 25, 2025
ba09853
comments and add warn for error
zachschuermann Feb 25, 2025
5ab75c4
address feedback
zachschuermann Feb 25, 2025
f6f5729
add with_buffer_size and deprecate the readahead one
zachschuermann Feb 25, 2025
df7a819
add deterministic test via OrderedGetStore
zachschuermann Feb 27, 2025
f0270c7
clean up imports
zachschuermann Feb 27, 2025
14e1288
combine keys and wakers under one lock
zachschuermann Feb 27, 2025
968926a
address feedback
zachschuermann Feb 27, 2025
baf04d0
better test_read_json_files_ordering
zachschuermann Feb 27, 2025
e05fdfa
fix docs
zachschuermann Feb 28, 2025
5063f8c
revert small changes
zachschuermann Feb 28, 2025
3316d8d
address feedback
zachschuermann Feb 28, 2025
2cff468
comment
zachschuermann Feb 28, 2025
751e582
add small buffer test
zachschuermann Feb 28, 2025
69284d2
flatmap
zachschuermann Feb 28, 2025
8551bca
fix
zachschuermann Feb 28, 2025
1e6f746
Merge branch 'main' into concurrent-json
zachschuermann Mar 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 87 additions & 65 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ mod tests {
}

impl<T: ObjectStore> OrderedGetStore<T> {
fn new(inner: T, ordered_keys: impl Into<VecDeque<Path>>) -> Self {
let ordered_keys = ordered_keys.into();
fn new(inner: T, ordered_keys: &[Path]) -> Self {
let ordered_keys: Vec<Path> = ordered_keys.to_vec();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably don't need the type annotation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yep

// Check for duplicates
let mut seen = HashSet::new();
for key in ordered_keys.iter() {
Expand All @@ -323,7 +323,7 @@ mod tests {
}

let state = KeysAndWakers {
ordered_keys,
ordered_keys: ordered_keys.into(),
wakers: HashMap::new(),
};

Expand Down Expand Up @@ -603,10 +603,7 @@ mod tests {
}

// Create ordered store with natural order (0, 1, 2, ...)
let ordered_store = Arc::new(OrderedGetStore::new(
memory_store.fork(),
ordered_paths.clone(),
));
let ordered_store = Arc::new(OrderedGetStore::new(memory_store, &ordered_paths));

let (tx, rx) = mpsc::channel();

Expand Down Expand Up @@ -648,76 +645,101 @@ mod tests {
// 1. we set up a list of FileMetas (and some random JSON content) in order
// 2. we then set up an ObjectStore to resolves those paths in a jumbled order
// 3. then call read_json_files and check that the results are in order
//
// note we don't want to go over 1000 since we only buffer 1000 requests at a time
let ordered_paths: Vec<Path> = (0..1000)
.map(|i| Path::from(format!("test/path{}", i)))
.collect();
let jumbled_paths: Vec<_> = ordered_paths[100..400]
.iter()
.chain(ordered_paths[400..].iter().rev())
.chain(ordered_paths[..100].iter())
.cloned()
.collect();

let test_list: &[(usize, Vec<Path>)] = &[
// test 1: buffer_size = 1000, just 1000 jumbled paths
(
1000, // buffer_size
ordered_paths[100..400]
.iter()
.chain(ordered_paths[400..].iter().rev())
.chain(ordered_paths[..100].iter())
.cloned()
.collect(),
),
// test 2: buffer_size = 4, jumbled paths in groups of 4
(
4, // buffer_size
(0..250)
.map(|i| {
[
ordered_paths[1 + 4 * i].clone(),
ordered_paths[4 * i].clone(),
ordered_paths[3 + 4 * i].clone(),
ordered_paths[2 + 4 * i].clone(),
]
})
.flatten()
.collect_vec(),
),
];

let memory_store = InMemory::new();
for (i, path) in ordered_paths.iter().enumerate() {
memory_store
.put(path, Bytes::from(format!("{{\"val\": {i}}}")).into())
.await
.unwrap();
}
// set up our ObjectStore to resolve paths in a jumbled order
let store = Arc::new(OrderedGetStore::new(memory_store, jumbled_paths));

// convert the paths to FileMeta
let ordered_file_meta: Vec<_> = ordered_paths
.iter()
.map(|path| {
let store = store.clone();
async move {
let url = Url::parse(&format!("memory:/{}", path)).unwrap();
let location = Path::from(path.as_ref());
let meta = store.head(&location).await.unwrap();
FileMeta {
location: url,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
for (buffer_size, jumbled_paths) in test_list {
// set up our ObjectStore to resolve paths in a jumbled order
let store = Arc::new(OrderedGetStore::new(memory_store.fork(), jumbled_paths));

// convert the paths to FileMeta
let ordered_file_meta: Vec<_> = ordered_paths
.iter()
.map(|path| {
let store = store.clone();
async move {
let url = Url::parse(&format!("memory:/{}", path)).unwrap();
let location = Path::from(path.as_ref());
let meta = store.head(&location).await.unwrap();
FileMeta {
location: url,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
}
}
}
})
.collect();

// note: join_all is ordered
let files = future::join_all(ordered_file_meta).await;

// fire off the read_json_files call (for all the files in order)
let handler = DefaultJsonHandler::new(
store,
Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
)),
);
let schema = Arc::new(ArrowSchema::new(vec![Arc::new(Field::new(
"val",
DataType::Int32,
true,
))]));
let physical_schema = Arc::new(schema.try_into().unwrap());
let data: Vec<RecordBatch> = handler
.read_json_files(&files, physical_schema, None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
.unwrap();
})
.collect();

// note: join_all is ordered
let files = future::join_all(ordered_file_meta).await;

// fire off the read_json_files call (for all the files in order)
let handler = DefaultJsonHandler::new(
store,
Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
)),
);
let handler = handler.with_buffer_size(*buffer_size);
let schema = Arc::new(ArrowSchema::new(vec![Arc::new(Field::new(
"val",
DataType::Int32,
true,
))]));
let physical_schema = Arc::new(schema.try_into().unwrap());
let data: Vec<RecordBatch> = handler
.read_json_files(&files, physical_schema, None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
.unwrap();

// check the order
let all_values: Vec<i32> = data
.iter()
.flat_map(|batch| {
let val_col: &Int32Array = batch.column(0).as_primitive();
(0..val_col.len()).map(|i| val_col.value(i)).collect_vec()
})
.collect();
assert_eq!(all_values, (0..1000).collect_vec());
// check the order
let all_values: Vec<i32> = data
.iter()
.flat_map(|batch| {
let val_col: &Int32Array = batch.column(0).as_primitive();
(0..val_col.len()).map(|i| val_col.value(i)).collect_vec()
})
.collect();
assert_eq!(all_values, (0..1000).collect_vec());
}
}
}
Loading