Skip to content

Commit

Permalink
feat(core/types): Implement concurrent read for blocking read
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Apr 28, 2024
1 parent 71c8df1 commit 5a597d3
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 83 deletions.
111 changes: 29 additions & 82 deletions core/src/types/blocking_read/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::borrow::Borrow;
use std::collections::Bound;
use std::ops::Range;
use std::ops::RangeBounds;

use bytes::Buf;
use bytes::BufMut;
use log::debug;
use rayon::prelude::*;

use crate::raw::oio::BlockingRead;
Expand All @@ -31,7 +31,8 @@ use crate::*;
/// BlockingReader is designed to read data from given path in an blocking
/// manner.
pub struct BlockingReader {
pub(crate) inner: oio::BlockingReader,
// pub(crate) inner: oio::BlockingReader,
pub(crate) iter: BufferIterator,
options: OpReader,
}

Expand All @@ -51,7 +52,9 @@ impl BlockingReader {
) -> crate::Result<Self> {
let (_, r) = acc.blocking_read(path, op)?;

Ok(BlockingReader { inner: r, options })
let iter = BufferIterator::new(r, options.clone());

Ok(BlockingReader { options, iter })
}

/// Read give range from reader into [`Buffer`].
Expand Down Expand Up @@ -82,72 +85,20 @@ impl BlockingReader {
}
}

// let iter = BufferIterator::new(self.inner, self.options);

let mut bufs = Vec::new();
let mut offset = start;
let concurrent = self.options.concurrent() as u64;


let (interval_size, mut intervals) = end
.map(|end| {
// let interval_size = (end - start + concurrent - 1) / concurrent;
let interval_size = (end - start) / concurrent;
let remainder = (end - start) % concurrent;
let intervals: Vec<(u64, u64)> = (0..concurrent)
.map(|i| {
let interval_start = start + i * interval_size + remainder.min(i);
let interval_end =
interval_start + interval_size + if i < remainder { 1 } else { 0 };
(interval_start, interval_end)
})
.filter(|(interval_start, interval_end)| interval_start != interval_end)
.collect();
(interval_size, intervals)
})
.unwrap_or({
// TODO: use service preferred io size instead.
let interval_size = 4 * 1024 * 1024;
let intervals: Vec<(u64, u64)> = (0..concurrent)
.map(|i| {
let current = start + i * interval_size;
(current, current + interval_size)
})
.collect();
(interval_size, intervals)
});

loop {

let results: Vec<Result<(usize, Buffer)>> = intervals
.into_par_iter()
.map(|(start, end)| -> Result<(usize, Buffer)> {
let limit = (end - start) as usize;

let bs = self.inner.read_at(start as u64, limit)?;
let n = bs.remaining();

Ok((n, bs))
})
.collect();
for result in results {
let result = result?;
bufs.push(result.1);
if result.0 < interval_size as usize {
return Ok(bufs.into_iter().flatten().collect());
}

offset += result.0 as u64;
if Some(offset) == end {
return Ok(bufs.into_iter().flatten().collect());
for buf in self.iter {
match buf {
Ok(bs) => {
bufs.push(bs);
}
Err(err) => return Err(err),
}

intervals = (0..concurrent)
.map(|i| {
let current = offset + i * interval_size;
(current, current + interval_size)
})
.collect();
}

Ok(bufs.into_iter().flatten().collect())
}

///
Expand Down Expand Up @@ -177,38 +128,34 @@ impl BlockingReader {
}
}

let mut offset = start;
let mut read = 0;

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
buf.put(bs);
read += n as u64;
if n < limit {
return Ok(read as _);
}
// let iter = BufferIterator::new(self.inner, self.options.clone());

let mut total_len = 0;

offset += n as u64;
if Some(offset) == end {
return Ok(read as _);
for buffer in self.iter {
match buffer {
Ok(bs) => {
total_len += bs.len();
buf.put(bs);
}
Err(err) => return Err(err),
}
}

Ok(total_len)
}

/// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`],
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
pub fn into_std_read(self, range: Range<u64>) -> StdReader {
// TODO: the capacity should be decided by services.
StdReader::new(self.inner, range)
StdReader::new(self.iter.inner, range)
}

/// Convert reader into [`StdBytesIterator`] which implements [`Iterator`].
#[inline]
pub fn into_bytes_iterator(self, range: Range<u64>) -> StdBytesIterator {
StdBytesIterator::new(self.inner, range)
StdBytesIterator::new(self.iter.inner, range)
}
}
92 changes: 92 additions & 0 deletions core/src/types/blocking_read/buffer_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::collections::Bound;
use std::ops::RangeBounds;

use bytes::Buf;
use rayon::prelude::*;

use crate::raw::oio::BlockingRead;
use crate::raw::*;
use crate::*;

pub struct BufferIterator {
pub(crate) inner: oio::BlockingReader,
chunk: Option<usize>,

offset: u64,
end: Option<u64>,
concurrent: usize,
}

impl BufferIterator {
pub fn new(inner: oio::BlockingReader, options: OpReader) -> Self {
let range = options.range().to_range();
let start = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start + 1,
Bound::Unbounded => 0,
};

let end = match range.end_bound().cloned() {
Bound::Included(end) => Some(end + 1),
Bound::Excluded(end) => Some(end),
Bound::Unbounded => None,
};

Self {
inner,
chunk: options.chunk(),
offset: start,
end,
concurrent: options.concurrent(),
}
}
}

impl Iterator for BufferIterator {
type Item = Result<Buffer>;

fn next(&mut self) -> Option<Self::Item> {
if self.offset >= self.end.unwrap_or(u64::MAX) {
return None;
}

let mut bufs = Vec::with_capacity(self.concurrent);
let interval_size = self.chunk.unwrap_or(4 * 1024 * 1024) as u64;

let intervals: Vec<(u64, u64)> = (0..self.concurrent as u64)
.map(|i| {
let current = self.offset + i * interval_size;
(current, current + interval_size)
})
.collect();

let results: Vec<Result<(usize, Buffer)>> = intervals
.into_par_iter()
.map(|(start, end)| -> Result<(usize, Buffer)> {
let limit = (end - start) as usize;

let bs = self.inner.read_at(start, limit)?;
let n = bs.remaining();

Ok((n, bs))
})
.collect();
for result in results {
match result {
Ok((n, buf)) => {
bufs.push(buf);
if n < interval_size as usize {
return Some(Ok(bufs.into_iter().flatten().collect()));
}

self.offset += n as u64;
if Some(self.offset) == self.end {
return Some(Ok(bufs.into_iter().flatten().collect()));
}
}
Err(err) => return Some(Err(err)),
}
}
Some(Ok(bufs.into_iter().flatten().collect()))
}
}
2 changes: 2 additions & 0 deletions core/src/types/blocking_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ mod std_bytes_iterator;
pub use std_bytes_iterator::StdBytesIterator;
mod std_reader;
pub use std_reader::StdReader;
mod buffer_iterator;
pub use buffer_iterator::BufferIterator;
6 changes: 5 additions & 1 deletion core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ impl BlockingOperator {
FunctionRead(OperatorFunction::new(
self.inner().clone(),
path,
(OpRead::default(), BytesRange::default(), OpReader::default()),
(
OpRead::default(),
BytesRange::default(),
OpReader::default(),
),
|inner, path, (args, range, options)| {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down
8 changes: 8 additions & 0 deletions core/src/types/operator/operator_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,14 @@ impl FunctionRead {
.map_args(|(args, range, options)| (args, range, options.with_concurrent(concurrent)));
self
}

/// Set the chunk size for this reader.
pub fn chunk(mut self, chunk_size: usize) -> Self {
self.0 = self
.0
.map_args(|(args, range, options)| (args, range, options.with_chunk(chunk_size)));
self
}
}

/// Function that generated by [`BlockingOperator::reader_with`].
Expand Down

0 comments on commit 5a597d3

Please sign in to comment.