Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 46 additions & 27 deletions rivetkit-typescript/packages/sqlite-native/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use std::ptr;
use std::slice;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use libsqlite3_sys::*;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -54,6 +54,9 @@
/// Maximum number of keys accepted by a single KV put or delete request.
const KV_MAX_BATCH_KEYS: usize = 128;

/// Opt-in flag for the native read cache. Disabled by default to match the WASM VFS.
const READ_CACHE_ENV_VAR: &str = "RIVETKIT_SQLITE_NATIVE_READ_CACHE";

/// First 108 bytes of a valid empty page-1 SQLite database.
///
/// This must match `HEADER_PREFIX` in
Expand Down Expand Up @@ -100,6 +103,16 @@
size >= 0 && (size as u64) <= kv::MAX_FILE_SIZE
}

fn read_cache_enabled() -> bool {
static READ_CACHE_ENABLED: OnceLock<bool> = OnceLock::new();

Check warning on line 108 in rivetkit-typescript/packages/sqlite-native/src/vfs.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-typescript/packages/sqlite-native/src/vfs.rs
*READ_CACHE_ENABLED.get_or_init(|| {
std::env::var(READ_CACHE_ENV_VAR)
.map(|value| matches!(value.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
.unwrap_or(false)
})
}

// MARK: VFS Metrics

/// Per-VFS-callback operation metrics for diagnosing native vs WASM performance.
Expand Down Expand Up @@ -139,6 +152,7 @@
kv: Arc<dyn SqliteKv>,
actor_id: String,
main_file_name: String,
read_cache_enabled: bool,
rt_handle: Handle,
io_methods: Box<sqlite3_io_methods>,
vfs_metrics: Arc<VfsMetrics>,
Expand Down Expand Up @@ -279,16 +293,16 @@
/// Read cache: maps chunk keys to their data. Populated on KV gets,
/// updated on writes, cleared on truncate/delete. This avoids
/// redundant KV round-trips for pages SQLite reads multiple times.
read_cache: HashMap<Vec<u8>, Vec<u8>>,
read_cache: Option<HashMap<Vec<u8>, Vec<u8>>>,
}

impl KvFileState {
fn new() -> Self {
fn new(read_cache_enabled: bool) -> Self {
Self {
batch_mode: false,
dirty_buffer: BTreeMap::new(),
saved_file_size: 0,
read_cache: HashMap::new(),
read_cache: read_cache_enabled.then(HashMap::new),
}
}
}
Expand Down Expand Up @@ -420,9 +434,11 @@
}
// Check read cache.
let key = kv::get_chunk_key(file.file_tag, chunk_idx as u32);
if let Some(cached) = state.read_cache.get(key.as_slice()) {
buffered_chunks.insert(chunk_idx, cached.clone());
continue;
if let Some(read_cache) = state.read_cache.as_ref() {
if let Some(cached) = read_cache.get(key.as_slice()) {
buffered_chunks.insert(chunk_idx, cached.clone());
continue;
}
}
chunk_keys_to_fetch.push(key.to_vec());
}
Expand All @@ -435,10 +451,11 @@
} else {
match ctx.kv_get(chunk_keys_to_fetch) {
Ok(resp) => {
// Populate read cache with fetched values.
for (key, value) in resp.keys.iter().zip(resp.values.iter()) {
if !value.is_empty() {
state.read_cache.insert(key.clone(), value.clone());
if let Some(read_cache) = state.read_cache.as_mut() {
for (key, value) in resp.keys.iter().zip(resp.values.iter()) {
if !value.is_empty() {
read_cache.insert(key.clone(), value.clone());
}
}
}
resp
Expand Down Expand Up @@ -645,13 +662,12 @@
entries_to_write.push((file.meta_key.to_vec(), encode_file_meta(file.size)));
}

// Update read cache with the entries we're about to write.
{
let state = get_file_state(file.state);
if let Some(read_cache) = get_file_state(file.state).read_cache.as_mut() {
for (key, value) in &entries_to_write {
// Only cache chunk keys, not metadata.
// Only cache chunk keys here. Metadata keys are read on open/access
// and should not be mixed into the per-page cache.
if key.len() == 8 {
state.read_cache.insert(key.clone(), value.clone());
read_cache.insert(key.clone(), value.clone());
}
}
}
Expand Down Expand Up @@ -702,15 +718,15 @@
return SQLITE_OK;
}

// Invalidate read cache entries for truncated chunks.
{
let state = get_file_state(file.state);
if let Some(read_cache) = get_file_state(file.state).read_cache.as_mut() {
let truncate_from_chunk = if size == 0 {
0u32
} else {
(size as u32 / kv::CHUNK_SIZE as u32) + 1
};
state.read_cache.retain(|key, _| {
// The read cache stores only chunk keys. Keep entries strictly before
// the truncation boundary so reads cannot serve bytes from removed chunks.
read_cache.retain(|key, _| {
// Chunk keys are 8 bytes: [prefix, version, CHUNK_PREFIX, file_tag, idx_be32]
if key.len() == 8 && key[3] == file.file_tag {
let chunk_idx = u32::from_be_bytes([key[4], key[5], key[6], key[7]]);
Expand Down Expand Up @@ -898,15 +914,17 @@
state.batch_mode = false;
return SQLITE_IOERR;
}

Check warning on line 917 in rivetkit-typescript/packages/sqlite-native/src/vfs.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-typescript/packages/sqlite-native/src/vfs.rs
// Move dirty buffer entries into the read cache so subsequent
// reads can serve them without a KV round-trip.
let flushed: Vec<_> = std::mem::take(&mut state.dirty_buffer)
.into_iter()
.collect();
for (chunk_index, data) in flushed {
let key = kv::get_chunk_key(file.file_tag, chunk_index);
state.read_cache.insert(key.to_vec(), data);
let flushed: Vec<_> = std::mem::take(&mut state.dirty_buffer).into_iter().collect();
if let Some(read_cache) = state.read_cache.as_mut() {
// Only chunk pages belong in the read cache. The metadata write above
// still goes through KV, but should not be cached as a page.
for (chunk_index, data) in flushed {
let key = kv::get_chunk_key(file.file_tag, chunk_index);
read_cache.insert(key.to_vec(), data);
}
}
file.meta_dirty = false;
state.batch_mode = false;
Expand Down Expand Up @@ -1010,7 +1028,7 @@
return SQLITE_CANTOPEN;
};

let state = Box::into_raw(Box::new(KvFileState::new()));
let state = Box::into_raw(Box::new(KvFileState::new(ctx.read_cache_enabled)));
let base = sqlite3_file {
pMethods: ctx.io_methods.as_ref() as *const sqlite3_io_methods,
};
Expand Down Expand Up @@ -1205,6 +1223,7 @@
kv,
actor_id: actor_id.clone(),
main_file_name: actor_id,
read_cache_enabled: read_cache_enabled(),
rt_handle,
io_methods: Box::new(io_methods),
vfs_metrics,
Expand Down
Loading