Skip to content

Commit

Permalink
Add the Sorter::into_reader_cursors method
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Jun 27, 2023
1 parent 4ff6904 commit c7f7516
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions src/sorter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ const MIN_SORTER_MEMORY: usize = 10_485_760; // 10MB
const DEFAULT_NB_CHUNKS: usize = 25;
const MIN_NB_CHUNKS: usize = 1;

use crate::{CompressionType, Error, Merger, MergerIter, Reader, Writer, WriterBuilder};
use crate::{
CompressionType, Error, Merger, MergerIter, Reader, ReaderCursor, Writer, WriterBuilder,
};

/// The kind of sort algorithm used by the sorter to sort its internal vector.
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -596,23 +598,35 @@ where
}

/// Consumes this [`Sorter`] and outputs a stream of the merged entries in key-order.
pub fn into_stream_merger_iter(mut self) -> Result<MergerIter<CC::Chunk, MF>, Error<U>> {
pub fn into_stream_merger_iter(self) -> Result<MergerIter<CC::Chunk, MF>, Error<U>> {
let (sources, merge) = self.extract_reader_cursors_and_merger()?;
let mut builder = Merger::builder(merge);
builder.extend(sources);
builder.build().into_stream_merger_iter().map_err(Error::convert_merge_error)
}

/// Consumes this [`Sorter`] and outputs the list of reader cursors.
pub fn into_reader_cursors(self) -> Result<Vec<ReaderCursor<CC::Chunk>>, Error<U>> {
self.extract_reader_cursors_and_merger().map(|(readers, _)| readers)
}

/// A helper function to extract the readers and the merge function.
fn extract_reader_cursors_and_merger(
mut self,
) -> Result<(Vec<ReaderCursor<CC::Chunk>>, MF), Error<U>> {
// Flush the pending unordered entries.
self.chunks_total_size = self.write_chunk()?;

let sources: Result<Vec<_>, Error<U>> = self
.chunks
let Sorter { chunks, merge, .. } = self;
let result: Result<Vec<_>, _> = chunks
.into_iter()
.map(|mut chunk| {
chunk.seek(SeekFrom::Start(0))?;
Reader::new(chunk).and_then(Reader::into_cursor).map_err(Error::convert_merge_error)
})
.collect();

let mut builder = Merger::builder(self.merge);
builder.extend(sources?);

builder.build().into_stream_merger_iter().map_err(Error::convert_merge_error)
result.map(|readers| (readers, merge))
}
}

Expand Down

0 comments on commit c7f7516

Please sign in to comment.