Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add ts filter on MergeStream #106

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ where
where
FP: 'scan,
{
let mut stream = MergeStream::<R, FP>::from_vec(streams).await?;
let mut stream = MergeStream::<R, FP>::from_vec(streams, u32::MAX.into()).await?;

// Kould: is the capacity parameter necessary?
let mut builder = R::Columns::builder(8192);
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ where
)
.await?;

Ok(MergeStream::from_vec(self.streams).await?)
Ok(MergeStream::from_vec(self.streams, self.ts).await?)
}

/// Get a Stream that returns RecordBatch consisting of a `batch_size` number of records
Expand Down Expand Up @@ -616,7 +616,7 @@ where
self.projection,
)
.await?;
let merge_stream = MergeStream::from_vec(self.streams).await?;
let merge_stream = MergeStream::from_vec(self.streams, self.ts).await?;

Ok(PackageStream::new(
batch_size,
Expand Down
49 changes: 30 additions & 19 deletions src/stream/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures_util::stream::StreamExt;
use pin_project_lite::pin_project;

use super::{Entry, ScanStream};
use crate::{fs::FileProvider, record::Record};
use crate::{fs::FileProvider, record::Record, timestamp::Timestamp};

pin_project! {
pub struct MergeStream<'merge, R, FP>
Expand All @@ -21,6 +21,7 @@ pin_project! {
streams: Vec<ScanStream<'merge, R, FP>>,
peeked: BinaryHeap<CmpEntry<'merge, R>>,
buf: Option<Entry<'merge, R>>,
ts: Timestamp,
}
}

Expand All @@ -31,6 +32,7 @@ where
{
pub(crate) async fn from_vec(
mut streams: Vec<ScanStream<'merge, R, FP>>,
ts: Timestamp,
) -> Result<Self, parquet::errors::ParquetError> {
let mut peeked = BinaryHeap::with_capacity(streams.len());

Expand All @@ -44,6 +46,7 @@ where
streams,
peeked,
buf: None,
ts,
};
merge_stream.next().await;

Expand All @@ -60,6 +63,7 @@ where

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let ts = this.ts;
while let Some(offset) = this.peeked.peek().map(|entry| entry.offset) {
let next = ready!(Pin::new(&mut this.streams[offset]).poll_next(cx)).transpose()?;
let peeked = match this.peeked.pop() {
Expand All @@ -70,7 +74,7 @@ where
this.peeked.push(CmpEntry::new(offset, next));
}
if let Some(buf) = this.buf {
if buf.key().value == peeked.entry.key().value {
if peeked.entry.key().ts > *ts || buf.key().value == peeked.entry.key().value {
continue;
}
}
Expand Down Expand Up @@ -195,11 +199,14 @@ mod tests {
let lower = "a".to_string();
let upper = "e".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge = MergeStream::<String, TokioExecutor>::from_vec(vec![
m1.scan(bound, 6.into()).into(),
m2.scan(bound, 6.into()).into(),
m3.scan(bound, 6.into()).into(),
])
let mut merge = MergeStream::<String, TokioExecutor>::from_vec(
vec![
m1.scan(bound, 6.into()).into(),
m2.scan(bound, 6.into()).into(),
m3.scan(bound, 6.into()).into(),
],
6.into(),
)
.await
.unwrap();

Expand Down Expand Up @@ -273,10 +280,12 @@ mod tests {
let lower = "1".to_string();
let upper = "4".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge =
MergeStream::<String, TokioExecutor>::from_vec(vec![m1.scan(bound, 0.into()).into()])
.await
.unwrap();
let mut merge = MergeStream::<String, TokioExecutor>::from_vec(
vec![m1.scan(bound, 0.into()).into()],
0.into(),
)
.await
.unwrap();

if let Some(Ok(Entry::Mutable(entry))) = merge.next().await {
assert_eq!(entry.key().value, "1");
Expand All @@ -285,26 +294,28 @@ mod tests {
};
if let Some(Ok(Entry::Mutable(entry))) = merge.next().await {
assert_eq!(entry.key().value, "2");
assert_eq!(entry.key().ts, 1.into());
assert_eq!(entry.key().ts, 0.into());
assert_eq!(entry.value().as_deref(), Some("2"));
} else {
unreachable!()
}
if let Some(Ok(Entry::Mutable(entry))) = merge.next().await {
assert_eq!(entry.key().value, "3");
assert_eq!(entry.key().ts, 1.into());
assert_eq!(entry.value().as_deref(), Some("3"));
assert_eq!(entry.key().value, "4");
assert_eq!(entry.key().ts, 0.into());
assert_eq!(entry.value().as_deref(), Some("4"));
} else {
unreachable!()
}

let lower = "1".to_string();
let upper = "4".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge =
MergeStream::<String, TokioExecutor>::from_vec(vec![m1.scan(bound, 1.into()).into()])
.await
.unwrap();
let mut merge = MergeStream::<String, TokioExecutor>::from_vec(
vec![m1.scan(bound, 1.into()).into()],
1.into(),
)
.await
.unwrap();

if let Some(Ok(Entry::Mutable(entry))) = merge.next().await {
assert_eq!(entry.key().value, "1");
Expand Down
9 changes: 6 additions & 3 deletions src/stream/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,12 @@ mod tests {
.await
.unwrap();

let merge = MergeStream::<Test, TokioExecutor>::from_vec(vec![m1
.scan((Bound::Unbounded, Bound::Unbounded), 6.into())
.into()])
let merge = MergeStream::<Test, TokioExecutor>::from_vec(
vec![m1
.scan((Bound::Unbounded, Bound::Unbounded), 6.into())
.into()],
6.into(),
)
.await
.unwrap();
let projection_indices = vec![0, 1, 2, 3];
Expand Down
5 changes: 5 additions & 0 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ mod tests {
.await
.unwrap();

{
// to increase timestamps to 1
let txn = db.transaction().await;
txn.commit().await.unwrap();
}
let mut txn = db.transaction().await;
txn.insert(Test {
vstring: "king".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl<R: Record> Trigger<R> for LengthTrigger<R> {

#[derive(Copy, Clone, Debug)]
pub enum TriggerType {
#[allow(unused)]
SizeOfMem(usize),
#[allow(unused)]
Length(usize),
}
pub(crate) struct TriggerFactory<R> {
Expand Down
Loading