diff --git a/rivetkit-typescript/packages/sqlite-native/src/vfs.rs b/rivetkit-typescript/packages/sqlite-native/src/vfs.rs index 7d00a36974..453e2cc00d 100644 --- a/rivetkit-typescript/packages/sqlite-native/src/vfs.rs +++ b/rivetkit-typescript/packages/sqlite-native/src/vfs.rs @@ -8,7 +8,7 @@ use std::ffi::{c_char, c_int, c_void, CStr, CString}; use std::ptr; use std::slice; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use libsqlite3_sys::*; use tokio::runtime::Handle; @@ -54,6 +54,9 @@ const MAX_PATHNAME: c_int = 64; /// Maximum number of keys accepted by a single KV put or delete request. const KV_MAX_BATCH_KEYS: usize = 128; +/// Opt-in flag for the native read cache. Disabled by default to match the WASM VFS. +const READ_CACHE_ENV_VAR: &str = "RIVETKIT_SQLITE_NATIVE_READ_CACHE"; + /// First 108 bytes of a valid empty page-1 SQLite database. /// /// This must match `HEADER_PREFIX` in @@ -100,6 +103,16 @@ fn is_valid_file_size(size: i64) -> bool { size >= 0 && (size as u64) <= kv::MAX_FILE_SIZE } +fn read_cache_enabled() -> bool { + static READ_CACHE_ENABLED: OnceLock = OnceLock::new(); + + *READ_CACHE_ENABLED.get_or_init(|| { + std::env::var(READ_CACHE_ENV_VAR) + .map(|value| matches!(value.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on")) + .unwrap_or(false) + }) +} + // MARK: VFS Metrics /// Per-VFS-callback operation metrics for diagnosing native vs WASM performance. @@ -139,6 +152,7 @@ struct VfsContext { kv: Arc, actor_id: String, main_file_name: String, + read_cache_enabled: bool, rt_handle: Handle, io_methods: Box, vfs_metrics: Arc, @@ -279,16 +293,16 @@ struct KvFileState { /// Read cache: maps chunk keys to their data. Populated on KV gets, /// updated on writes, cleared on truncate/delete. This avoids /// redundant KV round-trips for pages SQLite reads multiple times. - read_cache: HashMap, Vec>, + read_cache: Option, Vec>>, } impl KvFileState { - fn new() -> Self { + fn new(read_cache_enabled: bool) -> Self { Self { batch_mode: false, dirty_buffer: BTreeMap::new(), saved_file_size: 0, - read_cache: HashMap::new(), + read_cache: read_cache_enabled.then(HashMap::new), } } } @@ -420,9 +434,11 @@ unsafe extern "C" fn kv_io_read( } // Check read cache. let key = kv::get_chunk_key(file.file_tag, chunk_idx as u32); - if let Some(cached) = state.read_cache.get(key.as_slice()) { - buffered_chunks.insert(chunk_idx, cached.clone()); - continue; + if let Some(read_cache) = state.read_cache.as_ref() { + if let Some(cached) = read_cache.get(key.as_slice()) { + buffered_chunks.insert(chunk_idx, cached.clone()); + continue; + } } chunk_keys_to_fetch.push(key.to_vec()); } @@ -435,10 +451,11 @@ unsafe extern "C" fn kv_io_read( } else { match ctx.kv_get(chunk_keys_to_fetch) { Ok(resp) => { - // Populate read cache with fetched values. - for (key, value) in resp.keys.iter().zip(resp.values.iter()) { - if !value.is_empty() { - state.read_cache.insert(key.clone(), value.clone()); + if let Some(read_cache) = state.read_cache.as_mut() { + for (key, value) in resp.keys.iter().zip(resp.values.iter()) { + if !value.is_empty() { + read_cache.insert(key.clone(), value.clone()); + } } } resp @@ -645,13 +662,12 @@ unsafe extern "C" fn kv_io_write( entries_to_write.push((file.meta_key.to_vec(), encode_file_meta(file.size))); } - // Update read cache with the entries we're about to write. - { - let state = get_file_state(file.state); + if let Some(read_cache) = get_file_state(file.state).read_cache.as_mut() { for (key, value) in &entries_to_write { - // Only cache chunk keys, not metadata. + // Only cache chunk keys here. Metadata keys are read on open/access + // and should not be mixed into the per-page cache. if key.len() == 8 { - state.read_cache.insert(key.clone(), value.clone()); + read_cache.insert(key.clone(), value.clone()); } } } @@ -702,15 +718,15 @@ unsafe extern "C" fn kv_io_truncate(p_file: *mut sqlite3_file, size: sqlite3_int return SQLITE_OK; } - // Invalidate read cache entries for truncated chunks. - { - let state = get_file_state(file.state); + if let Some(read_cache) = get_file_state(file.state).read_cache.as_mut() { let truncate_from_chunk = if size == 0 { 0u32 } else { (size as u32 / kv::CHUNK_SIZE as u32) + 1 }; - state.read_cache.retain(|key, _| { + // The read cache stores only chunk keys. Keep entries strictly before + // the truncation boundary so reads cannot serve bytes from removed chunks. + read_cache.retain(|key, _| { // Chunk keys are 8 bytes: [prefix, version, CHUNK_PREFIX, file_tag, idx_be32] if key.len() == 8 && key[3] == file.file_tag { let chunk_idx = u32::from_be_bytes([key[4], key[5], key[6], key[7]]); @@ -901,12 +917,14 @@ unsafe extern "C" fn kv_io_file_control( // Move dirty buffer entries into the read cache so subsequent // reads can serve them without a KV round-trip. - let flushed: Vec<_> = std::mem::take(&mut state.dirty_buffer) - .into_iter() - .collect(); - for (chunk_index, data) in flushed { - let key = kv::get_chunk_key(file.file_tag, chunk_index); - state.read_cache.insert(key.to_vec(), data); + let flushed: Vec<_> = std::mem::take(&mut state.dirty_buffer).into_iter().collect(); + if let Some(read_cache) = state.read_cache.as_mut() { + // Only chunk pages belong in the read cache. The metadata write above + // still goes through KV, but should not be cached as a page. + for (chunk_index, data) in flushed { + let key = kv::get_chunk_key(file.file_tag, chunk_index); + read_cache.insert(key.to_vec(), data); + } } file.meta_dirty = false; state.batch_mode = false; @@ -1010,7 +1028,7 @@ unsafe extern "C" fn kv_vfs_open( return SQLITE_CANTOPEN; }; - let state = Box::into_raw(Box::new(KvFileState::new())); + let state = Box::into_raw(Box::new(KvFileState::new(ctx.read_cache_enabled))); let base = sqlite3_file { pMethods: ctx.io_methods.as_ref() as *const sqlite3_io_methods, }; @@ -1205,6 +1223,7 @@ impl KvVfs { kv, actor_id: actor_id.clone(), main_file_name: actor_id, + read_cache_enabled: read_cache_enabled(), rt_handle, io_methods: Box::new(io_methods), vfs_metrics,