Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge batcher input generic over containers #494

Merged
merged 14 commits into from
Jun 13, 2024

Conversation

antiguru
Copy link
Member

@antiguru antiguru commented May 23, 2024

Add infrastructure and implementations to support containers, both specific and general, from timely streams to merge batchers. This is the last piece to enable building arrangements from non-vector input data.

The change adds several new traits and implementations. The changes fall into roughly two categories:

  1. Capture behavior to transcribe streams of containers to chunks of sorted and consolidated data, suitable for integration in merge batchers.
  2. Split of merge batchers into chunk formation and chain maintenance.

The first allows spines to express more opinions on how streams of containers can be transformed into chunks, either by supplying specialized implementations that know exactly how to map specific containers, or by instructing a generic implementation with specific information about containers.

The second decouples the merge batcher's chain maintaining from forming chunks, which seems the right thing to do given that the two are distinct tasks.

@antiguru antiguru force-pushed the flatcontainer_e2e branch 5 times, most recently from 6d52449 to 2520e6f Compare May 31, 2024 18:41
@antiguru antiguru force-pushed the flatcontainer_e2e branch 3 times, most recently from 0daf37c to 4d350c7 Compare June 4, 2024 00:53
Copy link
Member

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many comments, but marking "approve" for now because it seems safe to land if not yet perfect. Many places would love more comments, which you already warned me about. A few other notes here and there.

I think not too far off I might try and write some mod comments to try and make sure I can stay on top of all of the concepts.

src/consolidation.rs Outdated Show resolved Hide resolved
src/consolidation.rs Outdated Show resolved Hide resolved
Comment on lines 283 to 232
/// Layout of data to be consolidated.
// TODO: This could be split in two, to separate sorting and consolidation.
pub trait ConsolidateLayout: Container {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we talked about this, but I now have no clue what it does. :D

src/consolidation.rs Outdated Show resolved Hide resolved
src/consolidation.rs Outdated Show resolved Hide resolved
src/trace/implementations/merge_batcher_flat.rs Outdated Show resolved Hide resolved
Comment on lines 166 to 164
if result.len() > 0 {
output.push(result);
} else {
self.recycle(result, stash);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems weird! At least, you are about to copy more things into result, but push out a potentially non-full vector. Is it on purpose?

src/trace/implementations/merge_batcher_flat.rs Outdated Show resolved Hide resolved
@antiguru antiguru changed the title Merge batcher for flatcontainer Merge batcher input generic over containers Jun 4, 2024
@antiguru antiguru marked this pull request as ready for review June 4, 2024 19:06
@antiguru antiguru force-pushed the flatcontainer_e2e branch 2 times, most recently from c7c4aa8 to c29adf6 Compare June 12, 2024 14:20
Make the BuilderInput trait generic over the key and value containers
instead of the layout, which makes trait bounds much simpler and allows
for reusing the `BuilerInput` implementation for generic key and value
containers, instead tying it to specific layouts.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Comment on lines +12 to +16
pub struct VecChunker<T> {
pending: Vec<T>,
ready: VecDeque<Vec<T>>,
empty: Option<Vec<T>>,
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could unify the type with ColumnationChunker below if that's helpful.

Copy link
Member

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Some comments around whether some chunker types are truly necessary. If they are not, no worries, and we can try and figure out off the critical path if they can be optimized out.

src/consolidation.rs Outdated Show resolved Hide resolved
use crate::consolidation::{consolidate_updates, ConsolidateContainer};
use crate::difference::Semigroup;

/// Chunk a stream of vectors into chains of vectors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this materially different from a capacity container builder? If they are very similar, having one fewer abstraction is great! If there is a really important distinction, we should write it down so that I don't accidentally optimize this away in the future. :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: perhaps rather compared to consolidating container builder, as it doesn't just choose a capacity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very different, but somewhat incompatible! The consolidating container builder has a PushInto implementation for individual items, but here we want to push whole containers (RefOrMut<...>). Since the other implementation already exists, we cannot define a impl for just RefOrMut because that would conflict.

There is a path out of this mess, but I don't know if we want to go down that route: We could introduce a ContainerMarker<C>(C) struct that we can use to specialize the PushInto implementation for the consolidating container builder. Since nothing that's further up in the hierarchy can know this type, Rust knows that there can't be conflicting implementations. If we had that, we might be able to remove the VecChunker.

That aside, having $n$ different implementations that are roughly the same isn't great, and I believe with a bit more effort we could tidy it up.

where
M: Merger,
C: ContainerBuilder<Container=M::Chunk> + Default,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Input really necessary as part of the struct, or can the PushInto<Input> constraint be added somewhere? Alternately, could it be added here to signpost what it is and why it exists (or just doc it).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle, it can be removed. Then we'd only carry a PushInto bound for C with us and everything should work. But .. for flatcontainers, Rust has a hard/impossible time to figure out what the scope's timestamp type would be and I couldn't figure out how to convince it to carry that information. Maybe it's a matter of trying a bit harder, but I feel we can do that outside of this change?

#[test] fn bfs_10_20_1000() { test_sizes(10, 20, 1000, Config::process(3)); }
#[test] fn bfs_100_200_10() { test_sizes(100, 200, 10, Config::process(3)); }
#[test] fn bfs_100_2000_1() { test_sizes(100, 2000, 1, Config::process(3)); }
#[test] fn bfs_10_20_1000() { test_sizes(10, 20, 1000, 3); }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config isn't Clone...

@antiguru antiguru merged commit d0a1cab into TimelyDataflow:master Jun 13, 2024
7 checks passed
@antiguru antiguru deleted the flatcontainer_e2e branch June 13, 2024 20:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants