Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions rivetkit-typescript/packages/sqlite-native/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,20 +393,18 @@
let end_chunk = (offset + requested_length - 1) / kv::CHUNK_SIZE;

let mut chunk_keys_to_fetch = Vec::new();
let mut buffered_chunks: HashMap<usize, Vec<u8>> = HashMap::new();
let mut buffered_chunks: HashMap<usize, &[u8]> = HashMap::new();
// Skip fetching chunks already present in the dirty buffer (batch mode) or read cache.
for chunk_idx in start_chunk..=end_chunk {
// Check dirty buffer first (batch mode writes).
if state.batch_mode {
if let Some(buffered) = state.dirty_buffer.get(&(chunk_idx as u32)) {
buffered_chunks.insert(chunk_idx, buffered.clone());
if state.dirty_buffer.contains_key(&(chunk_idx as u32)) {
continue;
}
}
// Check read cache.
let key = kv::get_chunk_key(file.file_tag, chunk_idx as u32);
if let Some(read_cache) = state.read_cache.as_ref() {
if let Some(cached) = read_cache.get(key.as_slice()) {
buffered_chunks.insert(chunk_idx, cached.clone());
buffered_chunks.insert(chunk_idx, cached.as_slice());
continue;
}
}
Expand All @@ -420,29 +418,26 @@
}
} else {
match ctx.kv_get(chunk_keys_to_fetch) {
Ok(resp) => {
if let Some(read_cache) = state.read_cache.as_mut() {
for (key, value) in resp.keys.iter().zip(resp.values.iter()) {
if !value.is_empty() {
read_cache.insert(key.clone(), value.clone());
}
}
}
resp
}
Ok(resp) => resp,
Err(_) => return SQLITE_IOERR_READ,
}
};
let value_map = build_value_map(&resp);

for chunk_idx in start_chunk..=end_chunk {
let chunk_data: Option<&[u8]> = buffered_chunks
.get(&chunk_idx)
.map(|v| v.as_slice())
.or_else(|| {
let key = kv::get_chunk_key(file.file_tag, chunk_idx as u32);
value_map.get(key.as_slice()).copied()
});
let chunk_data = if state.batch_mode {
state
.dirty_buffer
.get(&(chunk_idx as u32))
.map(|buffered| buffered.as_slice())
} else {
None
}
.or_else(|| buffered_chunks.get(&chunk_idx).copied())
.or_else(|| {
let chunk_key = kv::get_chunk_key(file.file_tag, chunk_idx as u32);
value_map.get(chunk_key.as_slice()).copied()
});
let chunk_offset = chunk_idx * kv::CHUNK_SIZE;
let read_start = offset.saturating_sub(chunk_offset);
let read_end = std::cmp::min(kv::CHUNK_SIZE, offset + requested_length - chunk_offset);
Expand All @@ -465,6 +460,16 @@
}
}

// `resp` is empty when every chunk was served from the dirty buffer or read cache.
// In that case this loop is a no-op.
if let Some(read_cache) = state.read_cache.as_mut() {
for (key, value) in resp.keys.iter().zip(resp.values.iter()) {
if !value.is_empty() {
read_cache.insert(key.clone(), value.clone());
}
}
}

let actual_bytes = std::cmp::min(requested_length, file_size - offset);
if actual_bytes < requested_length {
buf[actual_bytes..].fill(0);
Expand Down Expand Up @@ -569,7 +574,7 @@
let chunk_key = kv::get_chunk_key(file.file_tag, chunk_idx as u32).to_vec();
let cached_chunk = if needs_existing && ctx.read_cache_enabled {
let state = get_file_state(file.state);
state.read_cache.get(chunk_key.as_slice()).cloned()

Check failure on line 577 in rivetkit-typescript/packages/sqlite-native/src/vfs.rs

View workflow job for this annotation

GitHub Actions / Check

no method named `get` found for enum `Option<T>` in the current scope
} else {
None
};
Expand Down Expand Up @@ -609,7 +614,7 @@
let mut entries_to_write = Vec::with_capacity(plans.len() + 1);
for plan in &plans {
let existing_chunk = plan.cached_chunk.as_deref().or_else(|| {
plan.existing_chunk_index

Check failure on line 617 in rivetkit-typescript/packages/sqlite-native/src/vfs.rs

View workflow job for this annotation

GitHub Actions / Check

mismatched types
.and_then(|idx| existing_chunks.get(idx))
.and_then(|value| value.as_ref())
});
Expand Down
Loading