Skip to content

Commit

Permalink
Merge pull request #626 from jiangliu/prefetch
Browse files Browse the repository at this point in the history
Optimize performance by fix a bug in prefetch
  • Loading branch information
jiangliu authored Aug 2, 2022
2 parents 6d66c13 + a19bf33 commit eddfd1b
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 39 deletions.
34 changes: 20 additions & 14 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};

use reqwest::header::HeaderMap;
use reqwest::{
Expand Down Expand Up @@ -346,20 +346,15 @@ impl Connection {
catch_status: bool,
proxy: bool,
) -> ConnectionResult<Response> {
debug!(
"Request: {} {} headers: {:?}, proxy: {}, data: {}",
method,
url,
{
let mut display_headers = headers.clone();
display_headers.remove(HEADER_AUTHORIZATION);
display_headers
},
proxy,
data.is_some(),
);
let display_headers = {
let mut display_headers = headers.clone();
display_headers.remove(HEADER_AUTHORIZATION);
display_headers
};
let has_data = data.is_some();
let start = Instant::now();

let mut rb = client.request(method, url).headers(headers);
let mut rb = client.request(method.clone(), url).headers(headers);
if let Some(q) = query.as_ref() {
rb = rb.query(q);
}
Expand All @@ -382,6 +377,17 @@ impl Connection {
ret = rb.body("").send();
}

debug!(
"{} Request: {} {} headers: {:?}, proxy: {}, data: {}, duration: {}ms",
std::thread::current().name().unwrap_or_default(),
method,
url,
display_headers,
proxy,
has_data,
Instant::now().duration_since(start).as_millis(),
);

match ret {
Err(err) => Err(ConnectionError::Common(err)),
Ok(resp) => respond(resp, catch_status),
Expand Down
46 changes: 33 additions & 13 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl BlobCache for FileCacheEntry {
let blob_offset = pending[start].compress_offset();
let blob_end = pending[end].compress_offset() + pending[end].compress_size() as u64;
let blob_size = (blob_end - blob_offset) as usize;
match self.read_chunks(blob_offset, blob_size, &pending[start..end]) {
match self.read_chunks(blob_offset, blob_size, &pending[start..end], true) {
Ok(v) => {
total_size += blob_size;
for idx in start..end {
Expand Down Expand Up @@ -408,48 +408,59 @@ impl BlobObject for FileCacheEntry {
let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;
let chunks = meta.get_chunks_compressed(offset, size, RAFS_DEFAULT_CHUNK_SIZE * 2)?;
debug_assert!(!chunks.is_empty());
self.do_fetch_chunks(&chunks)
self.do_fetch_chunks(&chunks, true)
}

fn fetch_range_uncompressed(&self, offset: u64, size: u64) -> Result<usize> {
let meta = self.meta.as_ref().ok_or_else(|| einval!())?;
let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;
let chunks = meta.get_chunks_uncompressed(offset, size, RAFS_DEFAULT_CHUNK_SIZE * 2)?;
debug_assert!(!chunks.is_empty());
self.do_fetch_chunks(&chunks)
self.do_fetch_chunks(&chunks, false)
}

fn fetch_chunks(&self, range: &BlobIoRange) -> Result<usize> {
fn prefetch_chunks(&self, range: &BlobIoRange) -> Result<usize> {
let chunks = &range.chunks;
if chunks.is_empty() {
return Ok(0);
}

let mut ready_or_pending =
matches!(self.chunk_map.is_ready_or_pending(&chunks[0]), Ok(true));
for idx in 1..chunks.len() {
if chunks[idx - 1].id() + 1 != chunks[idx].id() {
return Err(einval!("chunks for fetch_chunks() must be continuous"));
}
if ready_or_pending
&& !matches!(self.chunk_map.is_ready_or_pending(&chunks[idx]), Ok(true))
{
ready_or_pending = false;
}
}
// All chunks to be prefetched are already pending for downloading, no need to reissue.
if ready_or_pending {
return Ok(0);
}

if range.blob_size < RAFS_DEFAULT_CHUNK_SIZE {
let max_size = RAFS_DEFAULT_CHUNK_SIZE - range.blob_size;
if let Some(meta) = self.meta.as_ref() {
if let Some(bm) = meta.get_blob_meta() {
if let Some(chunks) = bm.add_more_chunks(chunks, max_size) {
return self.do_fetch_chunks(&chunks);
return self.do_fetch_chunks(&chunks, true);
}
} else {
return Err(einval!("failed to get blob.meta"));
}
}
}

self.do_fetch_chunks(chunks)
self.do_fetch_chunks(chunks, true)
}
}

impl FileCacheEntry {
fn do_fetch_chunks(&self, chunks: &[BlobIoChunk]) -> Result<usize> {
fn do_fetch_chunks(&self, chunks: &[BlobIoChunk], prefetch: bool) -> Result<usize> {
if self.is_stargz() {
// FIXME: for stargz, we need to implement fetching multiple chunks. here
// is a heavy overhead workaround, needs to be optimized.
Expand Down Expand Up @@ -507,7 +518,12 @@ impl FileCacheEntry {
chunks[end_idx].compress_offset() + chunks[end_idx].compress_size() as u64;
let blob_size = (blob_end - blob_offset) as usize;

match self.read_chunks(blob_offset, blob_size, &chunks[start_idx..=end_idx]) {
match self.read_chunks(
blob_offset,
blob_size,
&chunks[start_idx..=end_idx],
prefetch,
) {
Ok(mut v) => {
total_size += blob_size;
trace!(
Expand Down Expand Up @@ -575,7 +591,6 @@ impl FileCacheEntry {
// - Optionally there may be some prefetch/read amplify requests following the user io request.
// - The optional prefetch/read amplify requests may be silently dropped.
fn read_iter(&self, bios: &mut [BlobIoDesc], buffers: &[FileVolatileSlice]) -> Result<usize> {
debug!("bios {:?}", bios);
// Merge requests with continuous blob addresses.
let requests = self
.merge_requests_for_user(bios, RAFS_DEFAULT_CHUNK_SIZE as usize * 2)
Expand Down Expand Up @@ -656,7 +671,7 @@ impl FileCacheEntry {
} else {
BlobIoTag::Internal(chunk.compress_offset())
};
// NOTE: Only this request region can steal more chunks from backend with user io.
// NOTE: Only this request region can read more chunks from backend with user io.
state.push(
RegionType::Backend,
chunk.compress_offset(),
Expand Down Expand Up @@ -717,9 +732,14 @@ impl FileCacheEntry {
}

let blob_size = region.blob_len as usize;
debug!("try to read {} bytes from backend", blob_size);

let mut chunks = self.read_chunks(region.blob_address, blob_size, &region.chunks)?;
debug!(
"{} try to read {} bytes of {} chunks from backend",
std::thread::current().name().unwrap_or_default(),
blob_size,
region.chunks.len()
);

let mut chunks = self.read_chunks(region.blob_address, blob_size, &region.chunks, false)?;
assert_eq!(region.chunks.len(), chunks.len());

let mut chunk_buffers = Vec::with_capacity(region.chunks.len());
Expand Down
12 changes: 12 additions & 0 deletions storage/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::fs::File;
use std::io::Result;
use std::slice;
use std::sync::Arc;
use std::time::Instant;

use fuse_backend_rs::file_buf::FileVolatileSlice;
use nydus_utils::{compress, digest};
Expand Down Expand Up @@ -193,9 +194,11 @@ pub trait BlobCache: Send + Sync {
blob_offset: u64,
blob_size: usize,
chunks: &[BlobIoChunk],
prefetch: bool,
) -> Result<Vec<Vec<u8>>> {
// Read requested data from the backend by altogether.
let mut c_buf = alloc_buf(blob_size);
let start = Instant::now();
let nr_read = self
.reader()
.read(c_buf.as_mut_slice(), blob_offset)
Expand All @@ -206,6 +209,15 @@ pub trait BlobCache: Send + Sync {
blob_size, nr_read
)));
}
let duration = Instant::now().duration_since(start).as_millis();
debug!(
"read_chunks: {} {} {} bytes at {}, duration {}ms",
std::thread::current().name().unwrap_or_default(),
if prefetch { "prefetch" } else { "fetch" },
blob_size,
blob_offset,
duration
);

let mut last = blob_offset;
let mut buffers: Vec<Vec<u8>> = Vec::with_capacity(chunks.len());
Expand Down
5 changes: 5 additions & 0 deletions storage/src/cache/state/blob_state_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ where
self.c.is_ready(chunk)
}

fn is_pending(&self, chunk: &dyn BlobChunkInfo) -> Result<bool> {
let index = C::get_index(chunk);
Ok(self.inflight_tracer.lock().unwrap().get(&index).is_some())
}

fn check_ready_and_mark_pending(&self, chunk: &dyn BlobChunkInfo) -> StorageResult<bool> {
let mut ready = self.c.is_ready(chunk).map_err(StorageError::CacheIndex)?;

Expand Down
14 changes: 14 additions & 0 deletions storage/src/cache/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ pub trait ChunkMap: Any + Send + Sync {
/// Check whether the chunk is ready for use.
fn is_ready(&self, chunk: &dyn BlobChunkInfo) -> Result<bool>;

/// Check whether the chunk is pending for downloading.
fn is_pending(&self, _chunk: &dyn BlobChunkInfo) -> Result<bool> {
Ok(false)
}

/// Check whether a chunk is ready for use or pending for downloading.
fn is_ready_or_pending(&self, chunk: &dyn BlobChunkInfo) -> Result<bool> {
if matches!(self.is_pending(chunk), Ok(true)) {
Ok(true)
} else {
self.is_ready(chunk)
}
}

/// Check whether the chunk is ready for use, and mark it as pending if not ready yet.
///
/// The function returns:
Expand Down
22 changes: 13 additions & 9 deletions storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(crate) struct AsyncWorkerMgr {
workers: AtomicU32,
active: AtomicBool,

prefetch_sema: Semaphore,
prefetch_sema: Arc<Semaphore>,
prefetch_channel: Arc<Channel<AsyncPrefetchMessage>>,
prefetch_config: Arc<AsyncPrefetchConfig>,
prefetch_delayed: AtomicU64,
Expand Down Expand Up @@ -126,7 +126,7 @@ impl AsyncWorkerMgr {
workers: AtomicU32::new(0),
active: AtomicBool::new(false),

prefetch_sema: Semaphore::new(0),
prefetch_sema: Arc::new(Semaphore::new(0)),
prefetch_channel: Arc::new(Channel::new()),
prefetch_config,
prefetch_delayed: AtomicU64::new(0),
Expand Down Expand Up @@ -237,16 +237,18 @@ impl AsyncWorkerMgr {
}

async fn handle_prefetch_requests(mgr: Arc<AsyncWorkerMgr>, rt: &Runtime) {
// Max 2 active requests per thread.
mgr.prefetch_sema.add_permits(2);
// Max 1 active requests per thread.
mgr.prefetch_sema.add_permits(1);

while let Ok(msg) = mgr.prefetch_channel.recv().await {
mgr.handle_prefetch_rate_limit(&msg).await;
let mgr2 = mgr.clone();

match msg {
AsyncPrefetchMessage::BlobPrefetch(state, blob_cache, offset, size) => {
let _ = mgr2.prefetch_sema.acquire().await;
let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone())
.await
.unwrap();
if state.load(Ordering::Acquire) > 0 {
rt.spawn(async move {
let _ = Self::handle_blob_prefetch_request(
Expand All @@ -257,17 +259,19 @@ impl AsyncWorkerMgr {
state.clone(),
)
.await;
mgr2.prefetch_sema.add_permits(1);
drop(token);
});
}
}
AsyncPrefetchMessage::FsPrefetch(state, blob_cache, req) => {
let _ = mgr2.prefetch_sema.acquire().await;
let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone())
.await
.unwrap();
if state.load(Ordering::Acquire) > 0 {
rt.spawn(async move {
let _ = Self::handle_fs_prefetch_request(mgr2.clone(), blob_cache, req)
.await;
mgr2.prefetch_sema.add_permits(1);
drop(token)
});
}
}
Expand Down Expand Up @@ -391,7 +395,7 @@ impl AsyncWorkerMgr {
mgr.metrics.prefetch_data_amount.add(blob_size);

if let Some(obj) = cache.get_blob_object() {
obj.fetch_chunks(&req)?;
obj.prefetch_chunks(&req)?;
} else {
cache.prefetch_range(&req)?;
}
Expand Down
4 changes: 2 additions & 2 deletions storage/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,8 @@ pub trait BlobObject: AsRawFd {
/// for use.
fn fetch_range_uncompressed(&self, offset: u64, size: u64) -> io::Result<usize>;

/// Fetch data for specified chunks from storage backend.
fn fetch_chunks(&self, range: &BlobIoRange) -> io::Result<usize>;
/// Prefetch data for specified chunks from storage backend.
fn prefetch_chunks(&self, range: &BlobIoRange) -> io::Result<usize>;
}

/// A wrapping object over an underlying [BlobCache] object.
Expand Down
2 changes: 1 addition & 1 deletion storage/src/remote/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl BlobObject for RemoteBlob {
}
}

fn fetch_chunks(&self, _range: &BlobIoRange) -> Result<usize> {
fn prefetch_chunks(&self, _range: &BlobIoRange) -> Result<usize> {
Err(enosys!())
}
}
Expand Down

0 comments on commit eddfd1b

Please sign in to comment.