Skip to content

Commit

Permalink
storage: refine debug messages
Browse files Browse the repository at this point in the history
Refine debug messages for performance analysis.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Aug 2, 2022
1 parent 461f5ca commit a19bf33
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 26 deletions.
34 changes: 20 additions & 14 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};

use reqwest::header::HeaderMap;
use reqwest::{
Expand Down Expand Up @@ -346,20 +346,15 @@ impl Connection {
catch_status: bool,
proxy: bool,
) -> ConnectionResult<Response> {
debug!(
"Request: {} {} headers: {:?}, proxy: {}, data: {}",
method,
url,
{
let mut display_headers = headers.clone();
display_headers.remove(HEADER_AUTHORIZATION);
display_headers
},
proxy,
data.is_some(),
);
let display_headers = {
let mut display_headers = headers.clone();
display_headers.remove(HEADER_AUTHORIZATION);
display_headers
};
let has_data = data.is_some();
let start = Instant::now();

let mut rb = client.request(method, url).headers(headers);
let mut rb = client.request(method.clone(), url).headers(headers);
if let Some(q) = query.as_ref() {
rb = rb.query(q);
}
Expand All @@ -382,6 +377,17 @@ impl Connection {
ret = rb.body("").send();
}

debug!(
"{} Request: {} {} headers: {:?}, proxy: {}, data: {}, duration: {}ms",
std::thread::current().name().unwrap_or_default(),
method,
url,
display_headers,
proxy,
has_data,
Instant::now().duration_since(start).as_millis(),
);

match ret {
Err(err) => Err(ConnectionError::Common(err)),
Ok(resp) => respond(resp, catch_status),
Expand Down
33 changes: 21 additions & 12 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl BlobCache for FileCacheEntry {
let blob_offset = pending[start].compress_offset();
let blob_end = pending[end].compress_offset() + pending[end].compress_size() as u64;
let blob_size = (blob_end - blob_offset) as usize;
match self.read_chunks(blob_offset, blob_size, &pending[start..end]) {
match self.read_chunks(blob_offset, blob_size, &pending[start..end], true) {
Ok(v) => {
total_size += blob_size;
for idx in start..end {
Expand Down Expand Up @@ -408,15 +408,15 @@ impl BlobObject for FileCacheEntry {
let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;
let chunks = meta.get_chunks_compressed(offset, size, RAFS_DEFAULT_CHUNK_SIZE * 2)?;
debug_assert!(!chunks.is_empty());
self.do_fetch_chunks(&chunks)
self.do_fetch_chunks(&chunks, true)
}

fn fetch_range_uncompressed(&self, offset: u64, size: u64) -> Result<usize> {
let meta = self.meta.as_ref().ok_or_else(|| einval!())?;
let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;
let chunks = meta.get_chunks_uncompressed(offset, size, RAFS_DEFAULT_CHUNK_SIZE * 2)?;
debug_assert!(!chunks.is_empty());
self.do_fetch_chunks(&chunks)
self.do_fetch_chunks(&chunks, false)
}

fn prefetch_chunks(&self, range: &BlobIoRange) -> Result<usize> {
Expand Down Expand Up @@ -447,20 +447,20 @@ impl BlobObject for FileCacheEntry {
if let Some(meta) = self.meta.as_ref() {
if let Some(bm) = meta.get_blob_meta() {
if let Some(chunks) = bm.add_more_chunks(chunks, max_size) {
return self.do_fetch_chunks(&chunks);
return self.do_fetch_chunks(&chunks, true);
}
} else {
return Err(einval!("failed to get blob.meta"));
}
}
}

self.do_fetch_chunks(chunks)
self.do_fetch_chunks(chunks, true)
}
}

impl FileCacheEntry {
fn do_fetch_chunks(&self, chunks: &[BlobIoChunk]) -> Result<usize> {
fn do_fetch_chunks(&self, chunks: &[BlobIoChunk], prefetch: bool) -> Result<usize> {
if self.is_stargz() {
// FIXME: for stargz, we need to implement fetching multiple chunks. here
// is a heavy overhead workaround, needs to be optimized.
Expand Down Expand Up @@ -518,7 +518,12 @@ impl FileCacheEntry {
chunks[end_idx].compress_offset() + chunks[end_idx].compress_size() as u64;
let blob_size = (blob_end - blob_offset) as usize;

match self.read_chunks(blob_offset, blob_size, &chunks[start_idx..=end_idx]) {
match self.read_chunks(
blob_offset,
blob_size,
&chunks[start_idx..=end_idx],
prefetch,
) {
Ok(mut v) => {
total_size += blob_size;
trace!(
Expand Down Expand Up @@ -586,7 +591,6 @@ impl FileCacheEntry {
// - Optionally there may be some prefetch/read amplify requests following the user io request.
// - The optional prefetch/read amplify requests may be silently dropped.
fn read_iter(&self, bios: &mut [BlobIoDesc], buffers: &[FileVolatileSlice]) -> Result<usize> {
debug!("bios {:?}", bios);
// Merge requests with continuous blob addresses.
let requests = self
.merge_requests_for_user(bios, RAFS_DEFAULT_CHUNK_SIZE as usize * 2)
Expand Down Expand Up @@ -667,7 +671,7 @@ impl FileCacheEntry {
} else {
BlobIoTag::Internal(chunk.compress_offset())
};
// NOTE: Only this request region can steal more chunks from backend with user io.
// NOTE: Only this request region can read more chunks from backend with user io.
state.push(
RegionType::Backend,
chunk.compress_offset(),
Expand Down Expand Up @@ -728,9 +732,14 @@ impl FileCacheEntry {
}

let blob_size = region.blob_len as usize;
debug!("try to read {} bytes from backend", blob_size);

let mut chunks = self.read_chunks(region.blob_address, blob_size, &region.chunks)?;
debug!(
"{} try to read {} bytes of {} chunks from backend",
std::thread::current().name().unwrap_or_default(),
blob_size,
region.chunks.len()
);

let mut chunks = self.read_chunks(region.blob_address, blob_size, &region.chunks, false)?;
assert_eq!(region.chunks.len(), chunks.len());

let mut chunk_buffers = Vec::with_capacity(region.chunks.len());
Expand Down
12 changes: 12 additions & 0 deletions storage/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::fs::File;
use std::io::Result;
use std::slice;
use std::sync::Arc;
use std::time::Instant;

use fuse_backend_rs::file_buf::FileVolatileSlice;
use nydus_utils::{compress, digest};
Expand Down Expand Up @@ -193,9 +194,11 @@ pub trait BlobCache: Send + Sync {
blob_offset: u64,
blob_size: usize,
chunks: &[BlobIoChunk],
prefetch: bool,
) -> Result<Vec<Vec<u8>>> {
// Read requested data from the backend by altogether.
let mut c_buf = alloc_buf(blob_size);
let start = Instant::now();
let nr_read = self
.reader()
.read(c_buf.as_mut_slice(), blob_offset)
Expand All @@ -206,6 +209,15 @@ pub trait BlobCache: Send + Sync {
blob_size, nr_read
)));
}
let duration = Instant::now().duration_since(start).as_millis();
debug!(
"read_chunks: {} {} {} bytes at {}, duration {}ms",
std::thread::current().name().unwrap_or_default(),
if prefetch { "prefetch" } else { "fetch" },
blob_size,
blob_offset,
duration
);

let mut last = blob_offset;
let mut buffers: Vec<Vec<u8>> = Vec::with_capacity(chunks.len());
Expand Down

0 comments on commit a19bf33

Please sign in to comment.