Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add one config to limit max disk usage for spilling queries #14975

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ use datafusion_common::{assert_contains, Result};
use datafusion_execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion_execution::TaskContext;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::{DiskManager, TaskContext};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::collect;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
use test_utils::AccessLogGenerator;
Expand Down Expand Up @@ -468,6 +470,83 @@ async fn test_stringview_external_sort() {
let _ = df.collect().await.expect("Query execution failed");
}

// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
// ------------------------------------------------------------------

// Create a new `SessionContext` with speicified disk limit and memory pool limit
async fn setup_context(
disk_limit: u64,
memory_pool_limit: usize,
) -> Result<SessionContext> {
let disk_manager = DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)?
.with_max_temp_directory_size(disk_limit)?;

let runtime = RuntimeEnvBuilder::new()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use the builder to specify disk limit for now, because DiskManagerConfig is an enum instead of struct, so now the setup routine is a bit hacky.
Changing it I think will inevitably cause API change, so I prefer to leave it to a separate PR.

.with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
.build_arc()
.unwrap();

let runtime = Arc::new(RuntimeEnv {
memory_pool: runtime.memory_pool.clone(),
disk_manager: Arc::new(disk_manager),
cache_manager: runtime.cache_manager.clone(),
object_store_registry: runtime.object_store_registry.clone(),
});

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(10 * 1024 * 1024) // 10MB
.with_target_partitions(1);

Ok(SessionContext::new_with_config_rt(config, runtime))
}

/// If the spilled bytes exceed the disk limit, the query should fail
/// (specified by `max_temp_directory_size` in `DiskManager`)
#[tokio::test]
async fn test_disk_spill_limit_reached() -> Result<()> {
let ctx = setup_context(100 * 1024 * 1024, 60 * 1024 * 1024).await?;

let df = ctx
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1")
.await
.unwrap();

let err = df.collect().await.unwrap_err();
assert_contains!(
err.to_string(),
"The used disk space during the spilling process has exceeded the allowable limit"
);

Ok(())
}

/// External query should succeed, if the spilled bytes is less than the disk limit
#[tokio::test]
async fn test_disk_spill_limit_not_reached() -> Result<()> {
let disk_spill_limit = 100 * 1024 * 1024; // 100MB
let ctx = setup_context(disk_spill_limit, 60 * 1024 * 1024).await?;

let df = ctx
.sql("select * from generate_series(1, 10000000) as t1(v1) order by v1")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect(Arc::clone(&plan), task_ctx)
.await
.expect("Query execution failed");

let spill_count = plan.metrics().unwrap().spill_count().unwrap();
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();

println!("spill count {}, spill bytes {}", spill_count, spilled_bytes);
assert!(spill_count > 0);
assert!((spilled_bytes as u64) < disk_spill_limit);

Ok(())
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
4 changes: 1 addition & 3 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ name = "datafusion_execution"

[dependencies]
arrow = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
Expand All @@ -49,6 +50,3 @@ parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
210 changes: 209 additions & 1 deletion datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,26 @@

//! [`DiskManager`]: Manages files generated during query execution

use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use arrow::ipc::writer::StreamWriter;
use datafusion_common::{
config_err, exec_datafusion_err, internal_err, resources_datafusion_err,
resources_err, DataFusionError, Result,
};
use log::debug;
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::{Builder, NamedTempFile, TempDir};

use crate::memory_pool::human_readable_size;
use crate::metrics::{Count, SpillMetrics};

const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB

/// Configuration for temporary disk access
#[derive(Debug, Clone)]
pub enum DiskManagerConfig {
Expand Down Expand Up @@ -75,6 +87,14 @@ pub struct DiskManager {
/// If `Some(vec![])` a new OS specified temporary directory will be created
/// If `None` an error will be returned (configured not to spill)
local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,

/// The maximum amount of data (in bytes) stored inside the temporary directories.
/// Default to 100GB
max_temp_directory_size: u64,

/// Used disk space in the temporary directories. Now only spilled data for
/// external executors are counted.
used_disk_space: Count,
}

impl DiskManager {
Expand All @@ -84,6 +104,8 @@ impl DiskManager {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Count::default(),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
Expand All @@ -93,14 +115,67 @@ impl DiskManager {
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Count::default(),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Count::default(),
})),
}
}

pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> {
match config {
DiskManagerConfig::Existing(manager) => {
Arc::try_unwrap(manager).map_err(|_| {
DataFusionError::Internal("Failed to unwrap Arc".to_string())
})
}
DiskManagerConfig::NewOs => Ok(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Count::default(),
}),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
debug!(
"Created local dirs {:?} as DataFusion working directory",
local_dirs
);
Ok(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Count::default(),
})
}
DiskManagerConfig::Disabled => Ok(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Count::default(),
}),
}
}

/// Set the maximum amount of data (in bytes) stored inside the temporary directories.
pub fn with_max_temp_directory_size(
mut self,
max_temp_directory_size: u64,
) -> Result<Self> {
// If the disk manager is disabled and `max_temp_directory_size` is not 0,
// this operation is not meaningful, fail early.
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
return config_err!(
"Cannot set max temp directory size for disabled disk manager"
);
}

self.max_temp_directory_size = max_temp_directory_size;
Ok(self)
}

/// Return true if this disk manager supports creating temporary
/// files. If this returns false, any call to `create_tmp_file`
/// will error.
Expand Down Expand Up @@ -144,6 +219,94 @@ impl DiskManager {
.map_err(DataFusionError::IoError)?,
})
}

/// Write record batches to a temporary file, and return the spill file handle.
///
/// This method is used within executors with spilling capabilities to write
/// temporary `RecordBatch`es to disk. Resource errors are returned if the written
/// file size exceeds the disk limit specified in `max_temp_directory_size` from
/// `DiskManager`.
///
/// # Arguments
///
/// * `batches` - A slice of `RecordBatch` to be written to disk. Note that this
/// slice can't be empty.
/// * `request_description` - A description of the request for logging and error messages.
/// * `caller_spill_metrics` - Metrics to be updated with the spill operation details, from the calling exeuctor.
pub fn try_spill_record_batches(
&self,
batches: &[RecordBatch],
request_description: &str,
caller_spill_metrics: &mut SpillMetrics,
) -> Result<RefCountedTempFile> {
if batches.is_empty() {
return internal_err!(
"`try_spill_record_batches` requires at least one batch"
);
}

let spill_file = self.create_tmp_file(request_description)?;
let schema = batches[0].schema();

let mut stream_writer = IPCStreamWriter::new(spill_file.path(), schema.as_ref())?;

for batch in batches {
// The IPC Stream writer does not have a mechanism to avoid writing duplicate
// `Buffer`s repeatedly, so we do not use `get_record_batch_memory_size()`
// to estimate the memory size with duplicated `Buffer`s.
let estimate_extra_size = batch.get_array_memory_size();

if (self.used_disk_space.value() + estimate_extra_size) as u64
> self.max_temp_directory_size
{
return resources_err!(
"The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
human_readable_size(self.max_temp_directory_size as usize)
);
}

self.used_disk_space.add(estimate_extra_size);
stream_writer.write(batch)?;
}

stream_writer.finish()?;

// Update calling executor's spill metrics
caller_spill_metrics
.spilled_bytes
.add(stream_writer.num_bytes);
caller_spill_metrics
.spilled_rows
.add(stream_writer.num_rows);
caller_spill_metrics.spill_file_count.add(1);

Ok(spill_file)
}

/// Refer to the documentation for [`Self::try_spill_record_batches`]. This method
/// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
pub fn try_spill_record_batch_by_size(
&self,
batch: &RecordBatch,
request_description: &str,
spill_metrics: &mut SpillMetrics,
row_limit: usize,
) -> Result<RefCountedTempFile> {
let total_rows = batch.num_rows();
let mut batches = Vec::new();
let mut offset = 0;

// It's ok to calculate all slices first, because slicing is zero-copy.
while offset < total_rows {
let length = std::cmp::min(total_rows - offset, row_limit);
let sliced_batch = batch.slice(offset, length);
batches.push(sliced_batch);
offset += length;
}

// Spill the sliced batches to disk
self.try_spill_record_batches(&batches, request_description, spill_metrics)
}
}

/// A wrapper around a [`NamedTempFile`] that also contains
Expand Down Expand Up @@ -183,6 +346,51 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
.collect()
}

/// Write in Arrow IPC Stream format to a file.
///
/// Stream format is used for spill because it supports dictionary replacement, and the random
/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
pub struct IPCStreamWriter {
/// Inner writer
pub writer: StreamWriter<File>,
/// Batches written
pub num_batches: usize,
/// Rows written
pub num_rows: usize,
/// Bytes written
pub num_bytes: usize,
}

impl IPCStreamWriter {
/// Create new writer
pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
let file = File::create(path).map_err(|e| {
exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
})?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
writer: StreamWriter::try_new(file, schema)?,
})
}

/// Write one single batch
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows();
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes;
Ok(())
}

/// Finish the writer
pub fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(Into::into)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub mod registry {
};
}

pub use disk_manager::DiskManager;
pub use disk_manager::{DiskManager, IPCStreamWriter};
pub mod metrics;
pub use registry::FunctionRegistry;
pub use stream::{RecordBatchStream, SendableRecordBatchStream};
pub use task::TaskContext;
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use super::{
/// case of constant strings
///
/// ```rust
/// use datafusion_physical_plan::metrics::*;
/// use datafusion_execution::metrics::*;
///
/// let metrics = ExecutionPlanMetricsSet::new();
/// let partition = 1;
Expand Down
Loading