Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@
[workspace.dependencies.rivet-envoy-protocol]
path = "engine/sdks/rust/envoy-protocol"

[workspace.dependencies.rivetkit-sqlite-native]
path = "rivetkit-typescript/packages/sqlite-native"

[workspace.dependencies.epoxy-protocol]
path = "engine/sdks/rust/epoxy-protocol"

Expand All @@ -524,7 +527,7 @@
[workspace.dependencies.rivet-ups-protocol]
path = "engine/sdks/rust/ups-protocol"

[workspace.dependencies.rivetkit-sqlite-native]

Check failure on line 530 in Cargo.toml

View workflow job for this annotation

GitHub Actions / Check

duplicate key
path = "rivetkit-typescript/packages/sqlite-native"

[profile.dev]
Expand Down
2 changes: 2 additions & 0 deletions rivetkit-typescript/packages/rivetkit-native/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* tslint:disable */

Check failure on line 1 in rivetkit-typescript/packages/rivetkit-native/index.d.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
/* eslint-disable */

/* auto-generated by NAPI-RS */
Expand Down Expand Up @@ -27,6 +27,7 @@
poolName: string
version: number
metadata?: any
notGlobal: boolean
/**
* Log level for the Rust tracing subscriber (e.g. "trace", "debug", "info", "warn", "error").
* Falls back to RIVET_LOG_LEVEL, then LOG_LEVEL, then RUST_LOG env vars. Defaults to "warn".
Expand Down Expand Up @@ -59,6 +60,7 @@
export declare function startEnvoyJs(config: JsEnvoyConfig, eventCallback: (event: any) => void): JsEnvoyHandle
/** Native SQLite database handle exposed to JavaScript. */
export declare class JsNativeDatabase {
takeLastKvError(): string | null
run(sql: string, params?: Array<JsBindParam> | undefined | null): Promise<ExecuteResult>
query(sql: string, params?: Array<JsBindParam> | undefined | null): Promise<QueryResult>
exec(sql: string): Promise<QueryResult>
Expand Down
16 changes: 16 additions & 0 deletions rivetkit-typescript/packages/rivetkit-native/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl EnvoyKv {

#[async_trait]
impl SqliteKv for EnvoyKv {
fn on_error(&self, actor_id: &str, error: &SqliteKvError) {
tracing::error!(%actor_id, %error, "native sqlite kv operation failed");
}

async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> {
Ok(())
}
Expand Down Expand Up @@ -115,6 +119,13 @@ impl JsNativeDatabase {
.and_then(|guard| guard.as_ref().map(NativeDatabase::as_ptr))
.unwrap_or(ptr::null_mut())
}

fn take_last_kv_error_inner(&self) -> Option<String> {
self.db
.lock()
.ok()
.and_then(|guard| guard.as_ref().and_then(NativeDatabase::take_last_kv_error))
}
}

#[napi(object)]
Expand All @@ -139,6 +150,11 @@ pub struct QueryResult {

#[napi]
impl JsNativeDatabase {
#[napi]
pub fn take_last_kv_error(&self) -> Option<String> {
self.take_last_kv_error_inner()
}

#[napi]
pub async fn run(
&self,
Expand Down
38 changes: 32 additions & 6 deletions rivetkit-typescript/packages/rivetkit-native/wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function startEnvoySync(config) {
poolName: config.poolName,
version: config.version,
metadata: config.metadata || null,
notGlobal: config.notGlobal,
notGlobal: config.notGlobal ?? false,
},
(event) => {
handleEvent(event, config, wrappedHandle);
Expand Down Expand Up @@ -264,6 +264,20 @@ function mapRows(rows, columns) {
});
}

function wrapNativeStorageError(nativeDb, error) {
const lastKvError =
typeof nativeDb.takeLastKvError === "function"
? nativeDb.takeLastKvError()
: null;
if (!lastKvError) {
throw error;
}
throw new Error(
`Database query failed because the underlying storage is no longer available (${lastKvError}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`,
{ cause: error },
);
}

async function openRawDatabaseFromEnvoy(handle, actorId) {
const nativeDb = await openDatabaseFromEnvoy(handle, actorId);
let closed = false;
Expand All @@ -288,16 +302,28 @@ async function openRawDatabaseFromEnvoy(handle, actorId) {
/\bRETURNING\b/i.test(query);

if (returnsRows) {
const result = await nativeDb.query(query, bindings);
return mapRows(result.rows, result.columns);
try {
const result = await nativeDb.query(query, bindings);
return mapRows(result.rows, result.columns);
} catch (error) {
wrapNativeStorageError(nativeDb, error);
}
}

await nativeDb.run(query, bindings);
try {
await nativeDb.run(query, bindings);
} catch (error) {
wrapNativeStorageError(nativeDb, error);
}
return [];
}

const result = await nativeDb.exec(query);
return mapRows(result.rows, result.columns);
try {
const result = await nativeDb.exec(query);
return mapRows(result.rows, result.columns);
} catch (error) {
wrapNativeStorageError(nativeDb, error);
}
},
close: async () => {
if (closed) {
Expand Down
4 changes: 4 additions & 0 deletions rivetkit-typescript/packages/sqlite-native/src/sqlite_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ pub struct KvGetResult {
/// at a higher level.
#[async_trait]
pub trait SqliteKv: Send + Sync {
/// Called when a KV operation fails inside a VFS callback before the
/// original error is collapsed into a generic SQLite IO error code.
fn on_error(&self, _actor_id: &str, _error: &SqliteKvError) {}

/// Called when an actor's database is opened.
async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> {
Ok(())
Expand Down
135 changes: 121 additions & 14 deletions rivetkit-typescript/packages/sqlite-native/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use std::ffi::{c_char, c_int, c_void, CStr, CString};
use std::ptr;
use std::slice;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, Mutex, OnceLock};

use libsqlite3_sys::*;
use tokio::runtime::Handle;

use crate::kv;
use crate::sqlite_kv::{KvGetResult, SqliteKv};
use crate::sqlite_kv::{KvGetResult, SqliteKv, SqliteKvError};

// MARK: Panic Guard

Expand Down Expand Up @@ -158,12 +158,62 @@ struct VfsContext {
actor_id: String,
main_file_name: String,
read_cache_enabled: bool,
last_error: Mutex<Option<String>>,
rt_handle: Handle,
io_methods: Box<sqlite3_io_methods>,
vfs_metrics: Arc<VfsMetrics>,
}

impl VfsContext {
fn clear_last_error(&self) {
match self.last_error.lock() {
Ok(mut last_error) => {
*last_error = None;
}
Err(err) => {
tracing::warn!(%err, "native sqlite last_error mutex poisoned");
}
}
}

fn set_last_error(&self, message: String) {
match self.last_error.lock() {
Ok(mut last_error) => {
*last_error = Some(message);
}
Err(err) => {
tracing::warn!(%err, "native sqlite last_error mutex poisoned");
}
}
}

fn clone_last_error(&self) -> Option<String> {
match self.last_error.lock() {
Ok(last_error) => last_error.clone(),
Err(err) => {
tracing::warn!(%err, "native sqlite last_error mutex poisoned");
None
}
}
}

fn take_last_error(&self) -> Option<String> {
match self.last_error.lock() {
Ok(mut last_error) => last_error.take(),
Err(err) => {
tracing::warn!(%err, "native sqlite last_error mutex poisoned");
None
}
}
}

fn report_kv_error(&self, err: SqliteKvError) -> String {
let message = err.to_string();
self.set_last_error(message.clone());
self.kv.on_error(&self.actor_id, &err);
message
}

fn resolve_file_tag(&self, path: &str) -> Option<u8> {
if path == self.main_file_name {
return Some(kv::FILE_TAG_MAIN);
Expand All @@ -187,7 +237,10 @@ impl VfsContext {
let result = self
.rt_handle
.block_on(self.kv.batch_get(&self.actor_id, keys))
.map_err(|e| e.to_string());
.map_err(|err| self.report_kv_error(err));
if result.is_ok() {
self.clear_last_error();
}
let elapsed = start.elapsed();
tracing::debug!(
op = %format_args!("get({key_count}keys)"),
Expand All @@ -203,7 +256,10 @@ impl VfsContext {
let result = self
.rt_handle
.block_on(self.kv.batch_put(&self.actor_id, keys, values))
.map_err(|e| e.to_string());
.map_err(|err| self.report_kv_error(err));
if result.is_ok() {
self.clear_last_error();
}
let elapsed = start.elapsed();
tracing::debug!(
op = %format_args!("put({key_count}keys)"),
Expand All @@ -219,7 +275,10 @@ impl VfsContext {
let result = self
.rt_handle
.block_on(self.kv.batch_delete(&self.actor_id, keys))
.map_err(|e| e.to_string());
.map_err(|err| self.report_kv_error(err));
if result.is_ok() {
self.clear_last_error();
}
let elapsed = start.elapsed();
tracing::debug!(
op = %format_args!("del({key_count}keys)"),
Expand All @@ -234,7 +293,10 @@ impl VfsContext {
let result = self
.rt_handle
.block_on(self.kv.delete_range(&self.actor_id, start, end))
.map_err(|e| e.to_string());
.map_err(|err| self.report_kv_error(err));
if result.is_ok() {
self.clear_last_error();
}
let elapsed = start_time.elapsed();
tracing::debug!(
op = "delRange",
Expand Down Expand Up @@ -574,7 +636,10 @@ unsafe extern "C" fn kv_io_write(
let chunk_key = kv::get_chunk_key(file.file_tag, chunk_idx as u32).to_vec();
let cached_chunk = if needs_existing && ctx.read_cache_enabled {
let state = get_file_state(file.state);
state.read_cache.get(chunk_key.as_slice()).cloned()
state
.read_cache
.as_ref()
.and_then(|read_cache| read_cache.get(chunk_key.as_slice()).cloned())
} else {
None
};
Expand Down Expand Up @@ -616,7 +681,7 @@ unsafe extern "C" fn kv_io_write(
let existing_chunk = plan.cached_chunk.as_deref().or_else(|| {
plan.existing_chunk_index
.and_then(|idx| existing_chunks.get(idx))
.and_then(|value| value.as_ref())
.and_then(|value| value.as_deref())
});

let mut new_chunk = if let Some(existing_chunk) = existing_chunk {
Expand Down Expand Up @@ -1164,11 +1229,30 @@ unsafe extern "C" fn kv_vfs_current_time(_p_vfs: *mut sqlite3_vfs, p_time_out: *
}

unsafe extern "C" fn kv_vfs_get_last_error(
_p_vfs: *mut sqlite3_vfs,
_n_byte: c_int,
_z_err_msg: *mut c_char,
p_vfs: *mut sqlite3_vfs,
n_byte: c_int,
z_err_msg: *mut c_char,
) -> c_int {
vfs_catch_unwind!(SQLITE_IOERR, SQLITE_OK)
vfs_catch_unwind!(SQLITE_IOERR, {
if n_byte <= 0 || z_err_msg.is_null() {
return 0;
}

let ctx = get_vfs_ctx(p_vfs);
let last_error = ctx.clone_last_error();
let Some(message) = last_error else {
*z_err_msg = 0;
return 0;
};

let bytes = message.as_bytes();
let max_len = (n_byte as usize).saturating_sub(1);
let copy_len = bytes.len().min(max_len);
let dst = z_err_msg.cast::<u8>();
ptr::copy_nonoverlapping(bytes.as_ptr(), dst, copy_len);
*dst.add(copy_len) = 0u8;
0
})
}

// MARK: KvVfs
Expand All @@ -1183,6 +1267,10 @@ unsafe impl Send for KvVfs {}
unsafe impl Sync for KvVfs {}

impl KvVfs {
fn take_last_kv_error(&self) -> Option<String> {
unsafe { (*self.ctx_ptr).take_last_error() }
}

pub fn register(
name: &str,
kv: Arc<dyn SqliteKv>,
Expand Down Expand Up @@ -1210,6 +1298,7 @@ impl KvVfs {
actor_id: actor_id.clone(),
main_file_name: actor_id,
read_cache_enabled: read_cache_enabled(),
last_error: Mutex::new(None),
rt_handle,
io_methods: Box::new(io_methods),
vfs_metrics,
Expand Down Expand Up @@ -1279,6 +1368,10 @@ impl NativeDatabase {
pub fn as_ptr(&self) -> *mut sqlite3 {
self.db
}

pub fn take_last_kv_error(&self) -> Option<String> {
self._vfs.take_last_kv_error()
}
}

impl Drop for NativeDatabase {
Expand All @@ -1291,6 +1384,18 @@ impl Drop for NativeDatabase {
}
}

fn sqlite_error_message(db: *mut sqlite3) -> String {
unsafe {
if db.is_null() {
"unknown sqlite error".to_string()
} else {
CStr::from_ptr(sqlite3_errmsg(db))
.to_string_lossy()
.into_owned()
}
}
}

pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, String> {
let c_name = CString::new(file_name).map_err(|err| err.to_string())?;
let mut db: *mut sqlite3 = ptr::null_mut();
Expand All @@ -1304,12 +1409,13 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, Stri
)
};
if rc != SQLITE_OK {
let message = sqlite_error_message(db);
if !db.is_null() {
unsafe {
sqlite3_close(db);
}
}
return Err(format!("sqlite3_open_v2 failed with code {rc}"));
return Err(format!("sqlite3_open_v2 failed with code {rc}: {message}"));
}

for pragma in &[
Expand All @@ -1324,10 +1430,11 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, Stri
let rc =
unsafe { sqlite3_exec(db, c_sql.as_ptr(), None, ptr::null_mut(), ptr::null_mut()) };
if rc != SQLITE_OK {
let message = sqlite_error_message(db);
unsafe {
sqlite3_close(db);
}
return Err(format!("{pragma} failed with code {rc}"));
return Err(format!("{pragma} failed with code {rc}: {message}"));
}
}

Expand Down
Loading