Skip to content

Commit

Permalink
Introduce the Merge trait
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Aug 8, 2021
1 parent cf659ce commit 5b1fe6a
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 70 deletions.
70 changes: 44 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,31 @@
//! use std::convert::TryInto;
//! use std::io::Cursor;
//!
//! use grenad::{MergerBuilder, Reader, Writer};
//! use grenad::{Merge, MergerBuilder, Reader, Writer};
//!
//! // This merge function:
//! // This merger:
//! // - parses u32s from native-endian bytes,
//! // - wrapping sums them and,
//! // - outputs the result as native-endian bytes.
//! fn wrapping_sum_u32s<'a>(
//! _key: &[u8],
//! values: &[Cow<'a, [u8]>],
//! ) -> Result<Cow<'a, [u8]>, TryFromSliceError>
//! {
//! let mut output: u32 = 0;
//! for bytes in values.iter().map(AsRef::as_ref) {
//! let num = bytes.try_into().map(u32::from_ne_bytes)?;
//! output = output.wrapping_add(num);
//! #[derive(Clone, Copy)]
//! struct WrappingSumU32s;
//!
//! impl Merge for WrappingSumU32s {
//! type Error = TryFromSliceError;
//! type Output = [u8; 4];
//!
//! fn merge<I, A>(&self, key: &[u8], values: I) -> Result<Self::Output, Self::Error>
//! where
//! I: IntoIterator<Item = A>,
//! A: AsRef<[u8]>
//! {
//! let mut output: u32 = 0;
//! for value in values {
//! let num = value.as_ref().try_into().map(u32::from_ne_bytes)?;
//! output = output.wrapping_add(num);
//! }
//! Ok(output.to_ne_bytes())
//! }
//! Ok(Cow::Owned(output.to_ne_bytes().to_vec()))
//! }
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -82,7 +90,7 @@
//!
//! // We create a merger that will sum our u32s when necessary,
//! // and we add our readers to the list of readers to merge.
//! let merger_builder = MergerBuilder::new(wrapping_sum_u32s);
//! let merger_builder = MergerBuilder::new(WrappingSumU32s);
//! let merger = merger_builder.add(readera).add(readerb).add(readerc).build();
//!
//! // We can iterate over the entries in key-order.
Expand All @@ -106,28 +114,36 @@
//! use std::borrow::Cow;
//! use std::convert::TryInto;
//!
//! use grenad::{CursorVec, SorterBuilder};
//! use grenad::{Merge, CursorVec, SorterBuilder};
//!
//! // This merge function:
//! // This merger:
//! // - parses u32s from native-endian bytes,
//! // - wrapping sums them and,
//! // - outputs the result as native-endian bytes.
//! fn wrapping_sum_u32s<'a>(
//! _key: &[u8],
//! values: &[Cow<'a, [u8]>],
//! ) -> Result<Cow<'a, [u8]>, TryFromSliceError>
//! {
//! let mut output: u32 = 0;
//! for bytes in values.iter().map(AsRef::as_ref) {
//! let num = bytes.try_into().map(u32::from_ne_bytes)?;
//! output = output.wrapping_add(num);
//! #[derive(Clone, Copy)]
//! struct WrappingSumU32s;
//!
//! impl Merge for WrappingSumU32s {
//! type Error = TryFromSliceError;
//! type Output = [u8; 4];
//!
//! fn merge<I, A>(&self, key: &[u8], values: I) -> Result<Self::Output, Self::Error>
//! where
//! I: IntoIterator<Item = A>,
//! A: AsRef<[u8]>
//! {
//! let mut output: u32 = 0;
//! for value in values {
//! let num = value.as_ref().try_into().map(u32::from_ne_bytes)?;
//! output = output.wrapping_add(num);
//! }
//! Ok(output.to_ne_bytes())
//! }
//! Ok(Cow::Owned(output.to_ne_bytes().to_vec()))
//! }
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // We create a sorter that will sum our u32s when necessary.
//! let mut sorter = SorterBuilder::new(wrapping_sum_u32s).chunk_creator(CursorVec).build();
//! let mut sorter = SorterBuilder::new(WrappingSumU32s).chunk_creator(CursorVec).build();
//!
//! // We insert multiple entries with the same key but different values
//! // in arbitrary order, the sorter will take care of merging them for us.
Expand Down Expand Up @@ -158,6 +174,7 @@ extern crate quickcheck;
mod block_builder;
mod compression;
mod error;
mod merge;
mod merger;
mod reader;
mod sorter;
Expand All @@ -166,6 +183,7 @@ mod writer;

pub use self::compression::CompressionType;
pub use self::error::Error;
pub use self::merge::Merge;
pub use self::merger::{Merger, MergerBuilder, MergerIter};
pub use self::reader::Reader;
#[cfg(feature = "tempfile")]
Expand Down
22 changes: 22 additions & 0 deletions src/merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pub trait Merge {
type Error;
type Output: AsRef<[u8]>;

fn merge<I, A>(&self, key: &[u8], values: I) -> Result<Self::Output, Self::Error>
where
I: IntoIterator<Item = A>,
A: AsRef<[u8]>;
}

impl<M: Merge> Merge for &M {
type Error = <M as Merge>::Error;
type Output = <M as Merge>::Output;

fn merge<I, A>(&self, key: &[u8], values: I) -> Result<Self::Output, Self::Error>
where
I: IntoIterator<Item = A>,
A: AsRef<[u8]>,
{
(**self).merge(key, values)
}
}
42 changes: 19 additions & 23 deletions src/merger.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::io;
use std::iter::once;

use crate::{Error, Reader, Writer};
use crate::{Error, Merge, Reader, Writer};

/// A struct that is used to configure a [`Merger`] with the sources to merge.
pub struct MergerBuilder<R, MF> {
Expand Down Expand Up @@ -83,7 +82,7 @@ impl<R, MF> Merger<R, MF> {
}
}

impl<R: io::Read, MF> Merger<R, MF> {
impl<R: io::Read, MF: Merge> Merger<R, MF> {
/// Consumes this [`Merger`] and outputs a stream of the merged entries in key-order.
pub fn into_merger_iter(self) -> Result<MergerIter<R, MF>, Error> {
let mut heap = BinaryHeap::new();
Expand All @@ -97,19 +96,19 @@ impl<R: io::Read, MF> Merger<R, MF> {
merge: self.merge,
heap,
current_key: Vec::new(),
merged_value: Vec::new(),
merged_value: None,
tmp_entries: Vec::new(),
})
}
}

impl<R, MF, U> Merger<R, MF>
impl<R, MF> Merger<R, MF>
where
R: io::Read,
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, U>,
MF: Merge,
{
/// Consumes this [`Merger`] and streams the entries to the [`Writer`] given in parameter.
pub fn write_into<W: io::Write>(self, writer: &mut Writer<W>) -> Result<(), Error<U>> {
pub fn write_into<W: io::Write>(self, writer: &mut Writer<W>) -> Result<(), Error<MF::Error>> {
let mut iter = self.into_merger_iter().map_err(Error::convert_merge_error)?;
while let Some((key, val)) = iter.next()? {
writer.insert(key, val)?;
Expand All @@ -119,22 +118,22 @@ where
}

/// An iterator that yield the merged entries in key-order.
pub struct MergerIter<R, MF> {
pub struct MergerIter<R, MF: Merge> {
merge: MF,
heap: BinaryHeap<Entry<R>>,
current_key: Vec<u8>,
merged_value: Vec<u8>,
merged_value: Option<MF::Output>,
/// We keep this buffer to avoid allocating a vec every time.
tmp_entries: Vec<Entry<R>>,
}

impl<R, MF, U> MergerIter<R, MF>
impl<R, MF> MergerIter<R, MF>
where
R: io::Read,
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, U>,
MF: Merge,
{
/// Yield the entries in key-order where values have been merged when needed.
pub fn next(&mut self) -> Result<Option<(&[u8], &[u8])>, Error<U>> {
pub fn next(&mut self) -> Result<Option<(&[u8], &[u8])>, Error<MF::Error>> {
let first_entry = match self.heap.pop() {
Some(entry) => entry,
None => return Ok(None),
Expand All @@ -158,21 +157,15 @@ where
}
}

/// Extract the currently pointed values from the entries.
// Extract the currently pointed values from the entries.
let other_values = self.tmp_entries.iter().filter_map(|e| e.iter.current().map(|(_, v)| v));
let values: Vec<_> = once(first_value).chain(other_values).map(Cow::Borrowed).collect();
let values = once(first_value).chain(other_values);

match (self.merge)(&first_key, &values) {
match self.merge.merge(&first_key, values) {
Ok(value) => {
self.current_key.clear();
self.current_key.extend_from_slice(first_key);
match value {
Cow::Owned(value) => self.merged_value = value,
Cow::Borrowed(value) => {
self.merged_value.clear();
self.merged_value.extend_from_slice(value);
}
}
self.merged_value = Some(value);
}
Err(e) => return Err(Error::Merge(e)),
}
Expand All @@ -184,6 +177,9 @@ where
}
}

Ok(Some((&self.current_key, &self.merged_value)))
match self.merged_value.as_ref() {
Some(value) => Ok(Some((&self.current_key, value.as_ref()))),
None => Ok(None),
}
}
}
Loading

0 comments on commit 5b1fe6a

Please sign in to comment.