Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 223 additions & 22 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl DiskManagerBuilder {
match self.mode {
DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: self.max_temp_directory_size,
max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}),
Expand All @@ -86,14 +86,14 @@ impl DiskManagerBuilder {
);
Ok(DiskManager {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: self.max_temp_directory_size,
max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})
}
DiskManagerMode::Disabled => Ok(DiskManager {
local_dirs: Mutex::new(None),
max_temp_directory_size: self.max_temp_directory_size,
max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}),
Expand Down Expand Up @@ -167,8 +167,9 @@ pub struct DiskManager {
/// If `None` an error will be returned (configured not to spill)
local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
/// The maximum amount of data (in bytes) stored inside the temporary directories.
/// Default to 100GB
max_temp_directory_size: u64,
/// Default to 100GB. Stored as `AtomicU64` so it can be adjusted at runtime
/// without requiring exclusive (`&mut`) access to the `DiskManager`.
max_temp_directory_size: AtomicU64,
/// Used disk space in the temporary directories. Now only spilled data for
/// external executors are counted.
used_disk_space: Arc<AtomicU64>,
Expand Down Expand Up @@ -199,7 +200,7 @@ impl DiskManager {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
Expand All @@ -210,53 +211,73 @@ impl DiskManager {
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
}
}

/// Set the max temp directory size. Requires exclusive access.
///
/// Prefer [`Self::update_max_temp_directory_size`] which takes `&self` and
/// works through `Arc` without exclusive access.
#[deprecated(
since = "54.0.0",
note = "Use `update_max_temp_directory_size` instead, which takes &self and works through Arc."
)]
pub fn set_max_temp_directory_size(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_max_temp_directory_size(&mut self, ...) is now redundant. It does exactly the same thing as update_max_temp_directory_size but requires &mut. Keeping both is mildly confusing

&mut self,
max_temp_directory_size: u64,
) -> Result<()> {
// If the disk manager is disabled and `max_temp_directory_size` is not 0,
// this operation is not meaningful, fail early.
self.update_max_temp_directory_size(max_temp_directory_size)
}

/// Atomically update the max temp directory size at runtime.
///
/// Takes `&self` (not `&mut self`), so it works through `Arc<DiskManager>`
/// without requiring exclusive access. Takes effect immediately for
/// subsequent spill writes.
///
/// Use this when you need to adjust the limit dynamically while queries
/// are running (e.g., adapting to available disk space).
pub fn update_max_temp_directory_size(
&self,
max_temp_directory_size: u64,
) -> Result<()> {
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
return config_err!(
"Cannot set max temp directory size for a disk manager that spilling is disabled"
);
}

self.max_temp_directory_size = max_temp_directory_size;
self.max_temp_directory_size.store(max_temp_directory_size, Ordering::Release);
Ok(())
}

#[deprecated(
since = "54.0.0",
note = "Use `update_max_temp_directory_size` instead"
)]
pub fn set_arc_max_temp_directory_size(
this: &mut Arc<Self>,
this: &Arc<Self>,
max_temp_directory_size: u64,
) -> Result<()> {
if let Some(inner) = Arc::get_mut(this) {
inner.set_max_temp_directory_size(max_temp_directory_size)?;
Ok(())
} else {
config_err!("DiskManager should be a single instance")
}
this.update_max_temp_directory_size(max_temp_directory_size)
}

pub fn with_max_temp_directory_size(
mut self,
max_temp_directory_size: u64,
) -> Result<Self> {
self.set_max_temp_directory_size(max_temp_directory_size)?;
self.update_max_temp_directory_size(max_temp_directory_size)?;
Ok(self)
}

Expand All @@ -266,7 +287,7 @@ impl DiskManager {

/// Returns the maximum temporary directory size in bytes
pub fn max_temp_directory_size(&self) -> u64 {
self.max_temp_directory_size
self.max_temp_directory_size.load(Ordering::Acquire)
}

/// Returns the current spilling progress
Expand Down Expand Up @@ -418,11 +439,12 @@ impl RefCountedTempFile {

// 3. Check if the updated global disk usage exceeds the configured limit
let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
if global_disk_usage > self.disk_manager.max_temp_directory_size {
let limit = self.disk_manager.max_temp_directory_size.load(Ordering::Acquire);
if global_disk_usage > limit {
return resources_err!(
"The used disk space during the spilling process has exceeded the allowable limit of {}. \
Please try increasing the config: `datafusion.runtime.max_temp_directory_size`.",
human_readable_size(self.disk_manager.max_temp_directory_size as usize)
human_readable_size(limit as usize)
);
}

Expand Down Expand Up @@ -796,4 +818,183 @@ mod tests {

Ok(())
}

#[test]
fn test_dynamic_limit_adjustment_through_shared_ref() -> Result<()> {
// Verify that set_max_temp_directory_size works through &self (not &mut self).
// This is the key behavioral change: the limit can be adjusted at runtime
// without exclusive access, enabling dynamic resize while queries are running.
let dm = DiskManager::builder()
.with_max_temp_directory_size(1024)
.build()?;
let dm = Arc::new(dm);

assert_eq!(dm.max_temp_directory_size(), 1024);

// Adjust through shared reference (simulates concurrent access via Arc)
dm.update_max_temp_directory_size(2048)?;
assert_eq!(dm.max_temp_directory_size(), 2048);

// Can also decrease
dm.update_max_temp_directory_size(512)?;
assert_eq!(dm.max_temp_directory_size(), 512);

Ok(())
}

#[test]
fn test_dynamic_limit_concurrent_access() -> Result<()> {
// Verify that multiple threads can read and write the limit concurrently
let dm = Arc::new(
DiskManager::builder()
.with_max_temp_directory_size(1000)
.build()?,
);

let handles: Vec<_> = (0..8)
.map(|i| {
let dm = Arc::clone(&dm);
std::thread::spawn(move || {
// Each thread sets a different limit and reads it back
let new_limit = (i + 1) * 1000;
dm.update_max_temp_directory_size(new_limit).unwrap();
// Read should return SOME value set by one of the threads
let current = dm.max_temp_directory_size();
assert!(current >= 1000 && current <= 8000);
})
})
.collect();

for h in handles {
h.join().unwrap();
}

// Final value should be one of the values set by threads
let final_val = dm.max_temp_directory_size();
assert!(final_val >= 1000 && final_val <= 8000);

Ok(())
}

#[test]
fn test_disabled_disk_manager_rejects_nonzero_limit() -> Result<()> {
let dm = DiskManager::builder()
.with_mode(DiskManagerMode::Disabled)
.build()?;
let dm = Arc::new(dm);

// Setting non-zero limit on disabled DiskManager should error
let result = dm.update_max_temp_directory_size(1024);
assert!(result.is_err());

// Setting zero is OK
assert!(dm.update_max_temp_directory_size(0).is_ok());

Ok(())
}

#[test]
fn test_limit_decrease_below_current_usage() -> Result<()> {
// Scenario: DiskManager has 100GB limit, currently using 80GB.
// Admin lowers limit to 60GB. What happens?
//
// Expected behavior:
// - Existing spill files remain on disk (not deleted)
// - used_disk_space still reports 80GB
// - New spill writes FAIL immediately (80GB > 60GB new limit)
// - Once old queries complete and release their files (used drops below 60GB),
// new spill writes succeed again
//
// This demonstrates graceful degradation: lowering the limit doesn't
// reclaim existing files (would break running queries), but prevents
// additional spilling until usage drops naturally.
let dm = DiskManager::builder()
.with_max_temp_directory_size(100 * 1024 * 1024 * 1024) // 100GB
.build()?;
let dm = Arc::new(dm);

// Simulate 80GB of existing spill usage
dm.used_disk_space.store(80 * 1024 * 1024 * 1024, Ordering::Relaxed);

assert_eq!(dm.max_temp_directory_size(), 100 * 1024 * 1024 * 1024);
assert_eq!(dm.used_disk_space(), 80 * 1024 * 1024 * 1024);

// Lower the limit to 60GB (below current usage)
dm.update_max_temp_directory_size(60 * 1024 * 1024 * 1024)?;
assert_eq!(dm.max_temp_directory_size(), 60 * 1024 * 1024 * 1024);

// Current usage (80GB) now exceeds the new limit (60GB).
// The used_disk_space is NOT reclaimed — existing files stay.
assert_eq!(dm.used_disk_space(), 80 * 1024 * 1024 * 1024);

// Any attempt to write MORE would be rejected at the SpillWriter level
// because used_disk_space(80GB) > max_temp_directory_size(60GB).
// (SpillWriter check: `global_disk_usage > limit` returns ResourcesExhausted)

// Simulate old queries completing: usage drops to 50GB
dm.used_disk_space.store(50 * 1024 * 1024 * 1024, Ordering::Relaxed);

// Now usage (50GB) < limit (60GB) — new spill writes would succeed again
assert!(dm.used_disk_space() < dm.max_temp_directory_size());

Ok(())
}

#[test]
fn test_limit_decrease_with_concurrent_queries() -> Result<()> {
// Scenario: Multiple threads spilling while limit is lowered concurrently.
// Demonstrates that:
// 1. In-flight spills that started before the limit change complete normally
// (they already incremented used_disk_space)
// 2. New spills after the limit change respect the new lower limit
// 3. No data corruption or panics from concurrent access
let dm = Arc::new(
DiskManager::builder()
.with_max_temp_directory_size(100 * 1024 * 1024) // 100MB
.build()?,
);

let barrier = Arc::new(std::sync::Barrier::new(5));

// 4 threads simulate concurrent spilling
let spill_handles: Vec<_> = (0..4)
.map(|_| {
let dm = Arc::clone(&dm);
let barrier = Arc::clone(&barrier);
std::thread::spawn(move || {
barrier.wait();
// Simulate spill: increment used_disk_space
dm.used_disk_space.fetch_add(10 * 1024 * 1024, Ordering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(10));
// Simulate cleanup
dm.used_disk_space.fetch_sub(10 * 1024 * 1024, Ordering::Relaxed);
})
})
.collect();

// 1 thread lowers the limit mid-flight
let dm_resize = Arc::clone(&dm);
let resize_barrier = Arc::clone(&barrier);
let resize_handle = std::thread::spawn(move || {
resize_barrier.wait();
// Lower limit while spills are in progress
dm_resize
.update_max_temp_directory_size(30 * 1024 * 1024) // 30MB
.unwrap();
});

for h in spill_handles {
h.join().unwrap();
}
resize_handle.join().unwrap();

// After all threads complete:
// - Limit is 30MB (last set by resize thread)
// - used_disk_space is 0 (all spills cleaned up)
// - No panics, no corruption
assert_eq!(dm.max_temp_directory_size(), 30 * 1024 * 1024);
assert_eq!(dm.used_disk_space(), 0);

Ok(())
}
}