From c7f75168437d39171d41f6b3092fb98a612237cf Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 27 Jun 2023 22:07:15 +0200 Subject: [PATCH] Add the Sorter::into_reader_cursors method --- src/sorter.rs | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/sorter.rs b/src/sorter.rs index 0c7a52c..94cf92d 100644 --- a/src/sorter.rs +++ b/src/sorter.rs @@ -19,7 +19,9 @@ const MIN_SORTER_MEMORY: usize = 10_485_760; // 10MB const DEFAULT_NB_CHUNKS: usize = 25; const MIN_NB_CHUNKS: usize = 1; -use crate::{CompressionType, Error, Merger, MergerIter, Reader, Writer, WriterBuilder}; +use crate::{ + CompressionType, Error, Merger, MergerIter, Reader, ReaderCursor, Writer, WriterBuilder, +}; /// The kind of sort algorithm used by the sorter to sort its internal vector. #[derive(Debug, Clone, Copy)] @@ -596,12 +598,27 @@ where } /// Consumes this [`Sorter`] and outputs a stream of the merged entries in key-order. - pub fn into_stream_merger_iter(mut self) -> Result, Error> { + pub fn into_stream_merger_iter(self) -> Result, Error> { + let (sources, merge) = self.extract_reader_cursors_and_merger()?; + let mut builder = Merger::builder(merge); + builder.extend(sources); + builder.build().into_stream_merger_iter().map_err(Error::convert_merge_error) + } + + /// Consumes this [`Sorter`] and outputs the list of reader cursors. + pub fn into_reader_cursors(self) -> Result>, Error> { + self.extract_reader_cursors_and_merger().map(|(readers, _)| readers) + } + + /// A helper function to extract the readers and the merge function. + fn extract_reader_cursors_and_merger( + mut self, + ) -> Result<(Vec>, MF), Error> { // Flush the pending unordered entries. self.chunks_total_size = self.write_chunk()?; - let sources: Result, Error> = self - .chunks + let Sorter { chunks, merge, .. } = self; + let result: Result, _> = chunks .into_iter() .map(|mut chunk| { chunk.seek(SeekFrom::Start(0))?; @@ -609,10 +626,7 @@ where }) .collect(); - let mut builder = Merger::builder(self.merge); - builder.extend(sources?); - - builder.build().into_stream_merger_iter().map_err(Error::convert_merge_error) + result.map(|readers| (readers, merge)) } }